Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Add new peer-log-format options #1208

Merged
merged 5 commits into from
Feb 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
@@ -1015,9 +1015,9 @@ namespace eosio {
void enqueue( const net_message& msg );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num, queued_buffer::queue_t queue );
void enqueue_buffer( msg_type_t net_msg,
std::optional<block_num_type> block_num,
queued_buffer::queue_t queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num,
go_away_reason close_after_send);
void cancel_sync();
void flush_queues();
@@ -1028,10 +1028,11 @@ namespace eosio {
void sync_wait();

void queue_write(msg_type_t net_msg,
std::optional<block_num_type> block_num,
queued_buffer::queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback);
void do_queue_write();
void do_queue_write(std::optional<block_num_type> block_num);

bool is_valid( const handshake_message& msg ) const;

@@ -1070,14 +1071,16 @@ namespace eosio {

fc::variant_object get_logger_variant() const {
fc::mutable_variant_object mvo;
mvo( "_name", log_p2p_address)
mvo( "_peer", peer_addr.empty() ? log_p2p_address : peer_addr )
( "_name", log_p2p_address)
( "_cid", connection_id )
( "_id", conn_node_id )
( "_sid", short_conn_node_id )
( "_ip", log_remote_endpoint_ip )
( "_port", log_remote_endpoint_port )
( "_lip", local_endpoint_ip )
( "_lport", local_endpoint_port );
( "_lport", local_endpoint_port )
( "_nver", protocol_version.load() );
return mvo;
}

@@ -1649,6 +1652,7 @@ namespace eosio {

// called from connection strand
void connection::queue_write(msg_type_t net_msg,
std::optional<block_num_type> block_num,
queued_buffer::queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
@@ -1657,20 +1661,27 @@ namespace eosio {
close();
return;
}
do_queue_write();
do_queue_write(block_num);
}

// called from connection strand
void connection::do_queue_write() {
if( !buffer_queue.ready_to_send(connection_id) || closed() )
void connection::do_queue_write(std::optional<block_num_type> block_num) {
if( !buffer_queue.ready_to_send(connection_id) ) {
if (block_num) {
peer_dlog(this, "connection currently sending, queueing block ${n}", ("n", *block_num) );
}
return;
connection_ptr c(shared_from_this());
}
if (closed()) {
peer_dlog(this, "connection closed, not sending queued write");
return;
}

std::vector<boost::asio::const_buffer> bufs;
buffer_queue.fill_out_buffer( bufs );

boost::asio::async_write( *socket, bufs,
boost::asio::bind_executor( strand, [c, socket=socket]( boost::system::error_code ec, std::size_t w ) {
boost::asio::bind_executor( strand, [c=shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t w ) {
try {
peer_dlog(c, "async write complete");
// May have closed connection and cleared buffer_queue
@@ -1704,7 +1715,7 @@ namespace eosio {
c->buffer_queue.clear_out_queue(ec, w);

c->enqueue_sync_block();
c->do_queue_write();
c->do_queue_write(std::nullopt);
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
@@ -1926,7 +1937,7 @@ namespace eosio {

buffer_factory buff_factory;
const auto& send_buffer = buff_factory.get_send_buffer( m );
enqueue_buffer( to_msg_type_t(m.index()), queued_buffer::queue_t::general, send_buffer, 0, close_after_send );
enqueue_buffer( to_msg_type_t(m.index()), std::nullopt, queued_buffer::queue_t::general, send_buffer, close_after_send );
}

// called from connection strand
@@ -1937,28 +1948,28 @@ namespace eosio {
block_buffer_factory buff_factory;
const auto& sb = buff_factory.get_send_buffer( b );
latest_blk_time = std::chrono::steady_clock::now();
enqueue_buffer( msg_type_t::signed_block, queue, sb, block_num, no_reason);
enqueue_buffer( msg_type_t::signed_block, block_num, queue, sb, no_reason);
return sb->size();
}

// called from connection strand
void connection::enqueue_buffer( msg_type_t net_msg,
std::optional<block_num_type> block_num, // only valid for net_msg == signed_block variant which
queued_buffer::queue_t queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num, // only valid for net_msg == signed_block variant which
go_away_reason close_after_send)
{
connection_ptr self = shared_from_this();
queue_write(net_msg, queue, send_buffer,
[conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) {
queue_write(net_msg, block_num, queue, send_buffer,
[conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t s) {
if (ec) {
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_reset && conn->socket_is_open()) {
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message()));
}
return;
}
if (net_msg == msg_type_t::signed_block)
fc_dlog(logger, "Connection - ${cid} - done sending block ${bn}", ("cid", conn->connection_id)("bn", block_num));
if (net_msg == msg_type_t::signed_block && block_num)
fc_dlog(logger, "Connection - ${cid} - done sending block ${bn}", ("cid", conn->connection_id)("bn", *block_num));
if (close_after_send != no_reason) {
fc_ilog( logger, "sent a go away message: ${r}, closing connection ${cid}",
("r", reason_str(close_after_send))("cid", conn->connection_id) );
@@ -2799,7 +2810,7 @@ namespace eosio {
boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::block_notice_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason );
cp->enqueue_buffer( msg_type_t::block_notice_message, std::nullopt, queued_buffer::queue_t::general, send_buffer, no_reason );
});
return;
}
@@ -2813,7 +2824,7 @@ namespace eosio {
bool has_block = cp->peer_fork_db_root_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::signed_block, queued_buffer::queue_t::general, sb, bnum, no_reason );
cp->enqueue_buffer( msg_type_t::signed_block, bnum, queued_buffer::queue_t::general, sb, no_reason );
}
});
} );
@@ -2827,7 +2838,7 @@ namespace eosio {
boost::asio::post(cp->strand, [cp, msg]() {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg_type_t::vote_message, queued_buffer::queue_t::general, msg, 0, no_reason );
cp->enqueue_buffer( msg_type_t::vote_message, std::nullopt, queued_buffer::queue_t::general, msg, no_reason );
});
return true;
} );
@@ -2848,7 +2859,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( msg_type_t::packed_transaction, queued_buffer::queue_t::general, sb, 0, no_reason );
cp->enqueue_buffer( msg_type_t::packed_transaction, std::nullopt, queued_buffer::queue_t::general, sb, no_reason );
} );
} );
}
@@ -3296,7 +3307,7 @@ namespace eosio {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} );

enqueue_buffer( msg_type_t::block_nack_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason );
enqueue_buffer( msg_type_t::block_nack_message, std::nullopt, queued_buffer::queue_t::general, send_buffer, no_reason );
}

void net_plugin_impl::plugin_shutdown() {
@@ -4380,17 +4391,19 @@ namespace eosio {
( "sync-peer-limit", bpo::value<uint32_t>()->default_value(3),
"Number of peers to sync from")
( "use-socket-read-watermark", bpo::value<bool>()->default_value(false), "Enable experimental socket read watermark optimization")
( "peer-log-format", bpo::value<string>()->default_value( "[\"${_name}\" - ${_cid} ${_ip}:${_port}] " ),
( "peer-log-format", bpo::value<string>()->default_value( "[\"${_peer}\" - ${_cid} ${_ip}:${_port}] " ),
"The string used to format peers when logging messages about them. Variables are escaped with ${<variable name>}.\n"
"Available Variables:\n"
" _peer \tendpoint name\n\n"
" _name \tself-reported name\n\n"
" _cid \tassigned connection id\n\n"
" _id \tself-reported ID (64 hex characters)\n\n"
" _sid \tfirst 8 characters of _peer.id\n\n"
" _ip \tremote IP address of peer\n\n"
" _port \tremote port number of peer\n\n"
" _lip \tlocal IP address connected to peer\n\n"
" _lport \tlocal port number connected to peer\n\n")
" _lport \tlocal port number connected to peer\n\n"
" _nver \tp2p protocol version\n\n")
( "p2p-keepalive-interval-ms", bpo::value<int>()->default_value(def_keepalive_interval), "peer heartbeat keepalive message interval in milliseconds")

;