Skip to content

Commit 74cce5d

Browse files
committed
Queue up multiple async writes to socket
1 parent 27f8e99 commit 74cce5d

File tree

1 file changed

+66
-45
lines changed

1 file changed

+66
-45
lines changed

plugins/net_plugin/net_plugin.cpp

+66-45
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ namespace eosio {
630630
_write_queue.clear();
631631
_sync_write_queue.clear();
632632
_trx_write_queue.clear();
633-
_write_queue_size = 0;
633+
//_write_queue_size = 0;
634634
_out_queue.clear();
635635
}
636636

@@ -639,32 +639,33 @@ namespace eosio {
639639
_write_queue.clear();
640640
_sync_write_queue.clear();
641641
_trx_write_queue.clear();
642-
_write_queue_size = 0;
642+
//_write_queue_size = 0;
643643
}
644644

645-
void clear_out_queue(boost::system::error_code ec, std::size_t number_of_bytes_written) {
645+
void clear_out_queue(uint32_t id, boost::system::error_code ec, std::size_t number_of_bytes_written) {
646646
fc::lock_guard g( _mtx );
647-
out_callback( ec, number_of_bytes_written );
648-
_out_queue.clear();
647+
out_callback( id, ec, number_of_bytes_written );
648+
_out_queue.erase(id);
649649
}
650650

651+
651652
uint32_t write_queue_size() const {
652653
fc::lock_guard g( _mtx );
653-
return _write_queue_size;
654+
return _write_queue.size() + _sync_write_queue.size() + _trx_write_queue.size() + out_queue_size();
654655
}
655656

656657
// called from connection strand
657-
bool ready_to_send(uint32_t connection_id) const {
658-
fc::unique_lock g( _mtx );
659-
// if out_queue is not empty then async_write is in progress
660-
const bool async_write_in_progress = !_out_queue.empty();
661-
const bool ready = !async_write_in_progress && _write_queue_size != 0;
662-
g.unlock();
663-
if (async_write_in_progress) {
664-
fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id));
665-
}
666-
return ready;
667-
}
658+
// bool ready_to_send(uint32_t connection_id) const {
659+
// fc::unique_lock g( _mtx );
660+
// // if out_queue is not empty then async_write is in progress
661+
// const bool async_write_in_progress = !_out_queue.empty();
662+
// const bool ready = !async_write_in_progress && _write_queue_size != 0;
663+
// g.unlock();
664+
// if (async_write_in_progress) {
665+
// fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id));
666+
// }
667+
// return ready;
668+
// }
668669

669670
enum class queue_t { block_sync, general };
670671
// @param callback must not callback into queued_buffer
@@ -680,44 +681,61 @@ namespace eosio {
680681
} else {
681682
_write_queue.emplace_back( buff, std::move(callback) );
682683
}
683-
_write_queue_size += buff->size();
684-
if( _write_queue_size > 2 * def_max_write_queue_size ) {
684+
if (_write_queue.size() + _sync_write_queue.size() + _trx_write_queue.size() + out_queue_size() > 2 * def_max_write_queue_size)
685685
return false;
686-
}
686+
// _write_queue_size += buff->size();
687+
// if( _write_queue_size > 2 * def_max_write_queue_size ) {
688+
// return false;
689+
// }
687690
return true;
688691
}
689692

690-
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
693+
uint32_t fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
691694
fc::lock_guard g( _mtx );
695+
uint32_t id = 0;
692696
if (!_sync_write_queue.empty()) { // always send msgs from sync_write_queue first
693-
fill_out_buffer( bufs, _sync_write_queue );
697+
id = fill_out_buffer( bufs, _sync_write_queue );
694698
} else if (!_write_queue.empty()) { // always send msgs from write_queue before trx queue
695-
fill_out_buffer( bufs, _write_queue );
696-
} else {
697-
fill_out_buffer( bufs, _trx_write_queue );
698-
assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0);
699+
id = fill_out_buffer( bufs, _write_queue );
700+
} else if (!_trx_write_queue.empty()) {
701+
id = fill_out_buffer( bufs, _trx_write_queue );
702+
// assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0);
703+
assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty());
699704
}
705+
return id;
700706
}
701707

702708
private:
703709
struct queued_write;
704-
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs,
705-
deque<queued_write>& w_queue ) REQUIRES(_mtx) {
710+
uint32_t fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs,
711+
deque<queued_write>& w_queue ) REQUIRES(_mtx) {
712+
++_out_queue_id;
706713
while ( !w_queue.empty() ) {
707714
auto& m = w_queue.front();
708715
bufs.emplace_back( m.buff->data(), m.buff->size() );
709-
_write_queue_size -= m.buff->size();
710-
_out_queue.emplace_back( m );
716+
//_write_queue_size -= m.buff->size();
717+
_out_queue[_out_queue_id].emplace_back( std::move(m) );
711718
w_queue.pop_front();
712719
}
720+
return _out_queue_id;
713721
}
714722

715-
void out_callback( boost::system::error_code ec, std::size_t number_of_bytes_written ) REQUIRES(_mtx) {
716-
for( auto& m : _out_queue ) {
723+
void out_callback( uint32_t id, boost::system::error_code ec, std::size_t number_of_bytes_written ) REQUIRES(_mtx) {
724+
for( auto& m : _out_queue.at(id) ) {
717725
m.callback( ec, number_of_bytes_written );
718726
}
719727
}
720728

729+
size_t out_queue_size() const REQUIRES(_mtx) {
730+
size_t size = 0;
731+
for (const auto& e : _out_queue) {
732+
for (const auto& b : e.second) {
733+
size = b.buff->size();
734+
}
735+
}
736+
return size;
737+
}
738+
721739
private:
722740
struct queued_write {
723741
std::shared_ptr<vector<char>> buff;
@@ -726,11 +744,12 @@ namespace eosio {
726744

727745
alignas(hardware_destructive_interference_sz)
728746
mutable fc::mutex _mtx;
729-
uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue
747+
uint32_t _out_queue_id GUARDED_BY(_mtx) {0};
748+
//uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue
730749
deque<queued_write> _write_queue GUARDED_BY(_mtx); // queued messages, all messages except sync & trxs
731750
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue blocks will be sent first
732751
deque<queued_write> _trx_write_queue GUARDED_BY(_mtx); // queued trx messages, trx_write_queue will be sent last
733-
deque<queued_write> _out_queue GUARDED_BY(_mtx); // currently being async_write
752+
map<uint32_t, deque<queued_write>> _out_queue GUARDED_BY(_mtx); // currently being async_write
734753

735754
}; // queued_buffer
736755

@@ -1666,34 +1685,36 @@ namespace eosio {
16661685

16671686
// called from connection strand
16681687
void connection::do_queue_write(std::optional<block_num_type> block_num) {
1669-
if( !buffer_queue.ready_to_send(connection_id) ) {
1670-
if (block_num) {
1671-
peer_dlog(this, "connection currently sending, queueing block ${n}", ("n", *block_num) );
1672-
}
1673-
return;
1674-
}
1688+
// if( !buffer_queue.ready_to_send(connection_id) ) {
1689+
// if (block_num) {
1690+
// peer_dlog(this, "connection currently sending, queueing block ${n}", ("n", *block_num) );
1691+
// }
1692+
// return;
1693+
// }
16751694
if (closed()) {
16761695
peer_dlog(this, "connection closed, not sending queued write");
16771696
return;
16781697
}
16791698

16801699
std::vector<boost::asio::const_buffer> bufs;
1681-
buffer_queue.fill_out_buffer( bufs );
1700+
uint32_t id = buffer_queue.fill_out_buffer( bufs );
1701+
if (id == 0)
1702+
return;
16821703

16831704
boost::asio::async_write( *socket, bufs,
1684-
boost::asio::bind_executor( strand, [c=shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t w ) {
1705+
boost::asio::bind_executor( strand, [id, c=shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t w ) {
16851706
try {
16861707
peer_dlog(c, "async write complete");
16871708
// May have closed connection and cleared buffer_queue
16881709
if (!c->socket->is_open() && c->socket_is_open()) { // if socket_open then close not called
16891710
peer_ilog(c, "async write socket closed before callback");
1690-
c->buffer_queue.clear_out_queue(ec, w);
1711+
c->buffer_queue.clear_out_queue(id, ec, w);
16911712
c->close();
16921713
return;
16931714
}
16941715
if (socket != c->socket ) { // different socket, c must have created a new socket, make sure previous is closed
16951716
peer_ilog( c, "async write socket changed before callback");
1696-
c->buffer_queue.clear_out_queue(ec, w);
1717+
c->buffer_queue.clear_out_queue(id, ec, w);
16971718
boost::system::error_code ignore_ec;
16981719
socket->shutdown( tcp::socket::shutdown_both, ignore_ec );
16991720
socket->close( ignore_ec );
@@ -1712,7 +1733,7 @@ namespace eosio {
17121733
c->bytes_sent += w;
17131734
c->last_bytes_sent = c->get_time();
17141735

1715-
c->buffer_queue.clear_out_queue(ec, w);
1736+
c->buffer_queue.clear_out_queue(id, ec, w);
17161737

17171738
c->enqueue_sync_block();
17181739
c->do_queue_write(std::nullopt);

0 commit comments

Comments
 (0)