Skip to content

Commit

Permalink
issue: 4183221 propagate socket errors
Browse files Browse the repository at this point in the history
In case we got an ECQE - set the socket in a closing state.
Effectively this sends a TCP-RST.

Signed-off-by: Tomer Cabouly <tcabouly@nvidia.com>
  • Loading branch information
tomerdbz committed Dec 10, 2024
1 parent c92a5bc commit fa87745
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 68 deletions.
9 changes: 9 additions & 0 deletions src/core/dev/cq_mgr_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <util/valgrind.h>
#include <sock/sock-redirect.h>
#include <sock/sock-app.h>
#include "lwip/tcp.h"
#include "ring_simple.h"
#include "hw_queue_tx.h"

Expand Down Expand Up @@ -214,6 +215,14 @@ int cq_mgr_tx::poll_and_process_element_tx(uint64_t *p_cq_poll_sn)
if (unlikely(cqe->op_own & 0x80) && is_error_opcode(cqe->op_own >> 4)) {
// m_p_cq_stat->n_tx_cqe_error++; Future counter
log_cqe_error(cqe);

tcp_pcb *pcb = m_hqtx_ptr->m_sq_wqe_idx_to_prop[index].pcb;

cq_logwarn("closing %p", pcb);
const auto tcp_state = get_tcp_state(pcb);
if (tcp_state != CLOSING && tcp_state != CLOSED) {
TCP_EVENT_ERR(pcb->errf, pcb->my_container, ERR_RST);
}
}

handle_sq_wqe_prop(index);
Expand Down
47 changes: 26 additions & 21 deletions src/core/dev/hw_queue_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
#define OCTOWORD 16
#define WQEBB 64

//#define DBG_DUMP_WQE 1
// #define DBG_DUMP_WQE 1

#ifdef DBG_DUMP_WQE
#define dbg_dump_wqe(_addr, _size) \
Expand Down Expand Up @@ -348,8 +348,7 @@ void hw_queue_tx::release_tx_buffers()
NOT_IN_USE(ret); // Suppress --enable-opt-log=high warning
}

void hw_queue_tx::send_wqe(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr, xlio_tis *tis,
unsigned credits)
void hw_queue_tx::send_wqe(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr, unsigned credits)
{
mem_buf_desc_t *p_mem_buf_desc = (mem_buf_desc_t *)p_send_wqe->wr_id;
/* Control tx completions:
Expand All @@ -362,11 +361,11 @@ void hw_queue_tx::send_wqe(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr
* m_n_unsignaled_count must be zero for this time.
*/
const bool request_comp = (p_mem_buf_desc->m_flags & mem_buf_desc_t::ZCOPY);
const bool skip_tx_poll = (attr & XLIO_TX_SKIP_POLL);
const bool skip_tx_poll = (attr.flags & XLIO_TX_SKIP_POLL);

hwqtx_logfunc("VERBS send, unsignaled_count: %d", m_n_unsignaled_count);

send_to_wire(p_send_wqe, attr, request_comp, tis, credits);
send_to_wire(p_send_wqe, attr, request_comp, credits);

if (!skip_tx_poll && is_signal_requested_for_last_wqe()) {
uint64_t dummy_poll_sn = 0;
Expand Down Expand Up @@ -826,27 +825,33 @@ inline int hw_queue_tx::fill_wqe_lso(xlio_ibv_send_wr *pswr, int data_len)
return wqebbs;
}

void hw_queue_tx::store_current_wqe_prop(mem_buf_desc_t *buf, unsigned credits, xlio_ti *ti)
void hw_queue_tx::store_current_wqe_prop(mem_buf_desc_t *buf, unsigned credits, xlio_ti *ti,
tcp_pcb *pcb /*=nullptr*/)
{
m_sq_wqe_idx_to_prop[m_sq_wqe_hot_index] = sq_wqe_prop {
.buf = buf,
.credits = credits,
.ti = ti,
.next = m_sq_wqe_prop_last,
};
m_sq_wqe_idx_to_prop[m_sq_wqe_hot_index] = sq_wqe_prop {.buf = buf,
.credits = credits,
.ti = ti,
.next = m_sq_wqe_prop_last,
.pcb = pcb};
m_sq_wqe_prop_last = &m_sq_wqe_idx_to_prop[m_sq_wqe_hot_index];
if (ti) {
ti->get();
}
}

void hw_queue_tx::store_current_wqe_prop(mem_buf_desc_t *buf, unsigned credits,
xlio_send_attr &attr)
{
store_current_wqe_prop(buf, credits, attr.tis, attr.pcb);
}

//! Send one RAW packet
void hw_queue_tx::send_to_wire(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr,
bool request_comp, xlio_tis *tis, unsigned credits)
void hw_queue_tx::send_to_wire(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr,
bool request_comp, unsigned credits)
{
struct xlio_mlx5_wqe_ctrl_seg *ctrl = nullptr;
struct mlx5_wqe_eth_seg *eseg = nullptr;
uint32_t tisn = tis ? tis->get_tisn() : 0;
uint32_t tisn = attr.tis ? attr.tis->get_tisn() : 0;

ctrl = (struct xlio_mlx5_wqe_ctrl_seg *)m_sq_wqe_hot;
eseg = (struct mlx5_wqe_eth_seg *)((uint8_t *)m_sq_wqe_hot + sizeof(*ctrl));
Expand All @@ -866,10 +871,11 @@ void hw_queue_tx::send_to_wire(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_a
*/
*((uint64_t *)eseg) = 0;
eseg->rsvd2 = 0;
eseg->cs_flags = (uint8_t)(attr & (XLIO_TX_PACKET_L3_CSUM | XLIO_TX_PACKET_L4_CSUM) & 0xff);
eseg->cs_flags =
(uint8_t)(attr.flags & (XLIO_TX_PACKET_L3_CSUM | XLIO_TX_PACKET_L4_CSUM) & 0xff);

/* Store buffer descriptor */
store_current_wqe_prop(reinterpret_cast<mem_buf_desc_t *>(p_send_wqe->wr_id), credits, tis);
store_current_wqe_prop(reinterpret_cast<mem_buf_desc_t *>(p_send_wqe->wr_id), credits, attr);

/* Complete WQE */
int wqebbs = fill_wqe(p_send_wqe);
Expand Down Expand Up @@ -1519,10 +1525,9 @@ void hw_queue_tx::trigger_completion_for_all_sent_packets()
hwqtx_logdbg("No space in SQ to trigger completions with a post operation");
return;
}

send_to_wire(&send_wr,
(xlio_wr_tx_packet_attr)(XLIO_TX_PACKET_L3_CSUM | XLIO_TX_PACKET_L4_CSUM),
true, nullptr, credits);
xlio_send_attr send_attrs =
(xlio_wr_tx_packet_attr)(XLIO_TX_PACKET_L3_CSUM | XLIO_TX_PACKET_L4_CSUM);
send_to_wire(&send_wr, send_attrs, true, credits);
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/core/dev/hw_queue_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "dev/cq_mgr_tx.h"
#include "dev/cq_mgr_rx.h"
#include "dev/dm_mgr.h"
#include "lwip/tcp.h"
#include "proto/dst_entry.h"
#include "proto/mem_buf_desc.h"
#include "proto/xlio_lwip.h"
#include "util/sg_array.h"
Expand Down Expand Up @@ -73,6 +75,7 @@ struct sq_wqe_prop {
/* Transport interface (TIS/TIR) current WQE holds reference to. */
xlio_ti *ti;
struct sq_wqe_prop *next;
tcp_pcb *pcb;
};

// @class hw_queue_tx
Expand All @@ -90,8 +93,7 @@ class hw_queue_tx : public xlio_ti_owner {
void up();
void down();

void send_wqe(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr, xlio_tis *tis,
unsigned credits);
void send_wqe(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr, unsigned credits);

struct ibv_qp *get_ibv_qp() const { return m_mlx5_qp.qp; };

Expand Down Expand Up @@ -215,8 +217,8 @@ class hw_queue_tx : public xlio_ti_owner {
void destroy_tis_cache();
void put_tls_tis_in_cache(xlio_tis *tis);

void send_to_wire(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr, bool request_comp,
xlio_tis *tis, unsigned credits);
void send_to_wire(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr, bool request_comp,
unsigned credits);

void set_unsignaled_count(void) { m_n_unsignaled_count = m_n_sysvar_tx_num_wr_to_signal - 1; }

Expand Down Expand Up @@ -260,7 +262,11 @@ class hw_queue_tx : public xlio_ti_owner {
inline void tls_get_progress_params_wqe(xlio_ti *ti, uint32_t tirn, void *buf, uint32_t lkey);
#endif /* DEFINED_UTLS */

inline void store_current_wqe_prop(mem_buf_desc_t *wr_id, unsigned credits, xlio_ti *ti);
inline void store_current_wqe_prop(mem_buf_desc_t *wr_id, unsigned credits,
xlio_send_attr &attr);

inline void store_current_wqe_prop(mem_buf_desc_t *wr_id, unsigned credits, xlio_ti *ti,
tcp_pcb *pcb = nullptr);
inline int fill_wqe(xlio_ibv_send_wr *p_send_wqe);
inline int fill_wqe_send(xlio_ibv_send_wr *pswr);
inline int fill_wqe_lso(xlio_ibv_send_wr *pswr, int data_len);
Expand Down
5 changes: 3 additions & 2 deletions src/core/dev/ring.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct xlio_tls_info;
class sockinfo;
class rfs_rule;
class poll_group;
struct xlio_send_attr;

#define ring_logpanic __log_info_panic
#define ring_logerr __log_info_err
Expand Down Expand Up @@ -101,7 +102,7 @@ class ring {
virtual void send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr) = 0;
virtual int send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis) = 0;
xlio_send_attr &attr) = 0;

virtual int get_num_resources() const = 0;
virtual int *get_rx_channel_fds(size_t &length) const
Expand Down Expand Up @@ -233,7 +234,7 @@ class ring {
};

virtual int get_supported_nvme_feature_mask() const { return 0; }
virtual void post_nop_fence(void) {}
virtual void post_nop_fence(void) { }
virtual void post_dump_wqe(xlio_tis *tis, void *addr, uint32_t len, uint32_t lkey, bool first)
{
NOT_IN_USE(tis);
Expand Down
4 changes: 2 additions & 2 deletions src/core/dev/ring_bond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,14 @@ void ring_bond::send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe
}

int ring_bond::send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis)
xlio_send_attr &attr)
{
mem_buf_desc_t *p_mem_buf_desc = (mem_buf_desc_t *)(p_send_wqe->wr_id);

std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);

if (is_active_member(p_mem_buf_desc->p_desc_owner, id)) {
return m_xmit_rings[id]->send_lwip_buffer(id, p_send_wqe, attr, tis);
return m_xmit_rings[id]->send_lwip_buffer(id, p_send_wqe, attr);
}

ring_logfunc("active ring=%p, silent packet drop (%p), (HA event?)", m_xmit_rings[id],
Expand Down
2 changes: 1 addition & 1 deletion src/core/dev/ring_bond.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ring_bond : public ring {
virtual void send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr);
virtual int send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis);
xlio_send_attr &attr);
virtual void mem_buf_desc_return_single_to_owner_tx(mem_buf_desc_t *p_mem_buf_desc);
virtual void mem_buf_desc_return_single_multi_ref(mem_buf_desc_t *p_mem_buf_desc, unsigned ref);
virtual bool is_member(ring_slave *rng);
Expand Down
16 changes: 9 additions & 7 deletions src/core/dev/ring_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <mutex>
#include "ring_simple.h"

#include "proto/dst_entry.h"
#include "util/valgrind.h"
#include "util/sg_array.h"
#include "sock/fd_collection.h"
Expand Down Expand Up @@ -716,15 +717,15 @@ void ring_simple::mem_buf_rx_release(mem_buf_desc_t *p_mem_buf_desc)
}

/* note that this function is inline, so keep it above the functions using it */
inline int ring_simple::send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr,
xlio_tis *tis)
inline int ring_simple::send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr)
{
int ret = 0;
unsigned credits = m_hqtx->credits_calculate(p_send_wqe);

if (likely(m_hqtx->credits_get(credits)) ||
is_available_qp_wr(is_set(attr, XLIO_TX_PACKET_BLOCK), credits)) {
m_hqtx->send_wqe(p_send_wqe, attr, tis, credits);
is_available_qp_wr(is_set(attr.flags, XLIO_TX_PACKET_BLOCK), credits)) {
xlio_send_attr send_attr = attr;
m_hqtx->send_wqe(p_send_wqe, send_attr, credits);
} else {
ring_logdbg("Silent packet drop, SQ is full!");
ret = -1;
Expand Down Expand Up @@ -754,16 +755,17 @@ void ring_simple::send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_w
}

std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);
int ret = send_buffer(p_send_wqe, attr, nullptr);
xlio_send_attr send_attr = attr;
int ret = send_buffer(p_send_wqe, send_attr);
send_status_handler(ret, p_send_wqe);
}

int ring_simple::send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis)
xlio_send_attr &attr)
{
NOT_IN_USE(id);
std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);
int ret = send_buffer(p_send_wqe, attr, tis);
int ret = send_buffer(p_send_wqe, attr);
send_status_handler(ret, p_send_wqe);
return ret;
}
Expand Down
5 changes: 2 additions & 3 deletions src/core/dev/ring_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ class ring_simple : public ring_slave {
void mem_buf_desc_return_to_owner_tx(mem_buf_desc_t *p_mem_buf_desc);
void mem_buf_desc_return_to_owner_rx(mem_buf_desc_t *p_mem_buf_desc,
void *pv_fd_ready_array = nullptr);
inline int send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr,
xlio_tis *tis);
inline int send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr);
bool is_up() override;
void start_active_queue_tx();
void start_active_queue_rx();
Expand All @@ -97,7 +96,7 @@ class ring_simple : public ring_slave {
void send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr) override;
int send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis) override;
xlio_send_attr &attr) override;
void mem_buf_desc_return_single_to_owner_tx(mem_buf_desc_t *p_mem_buf_desc) override;
void mem_buf_desc_return_single_multi_ref(mem_buf_desc_t *p_mem_buf_desc,
unsigned ref) override;
Expand Down
13 changes: 7 additions & 6 deletions src/core/dev/ring_tap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "ring_tap.h"

#include <linux/if_tun.h>
#include "proto/dst_entry.h"
#include "util/sg_array.h"
#include "sock/fd_collection.h"
#include "dev/net_device_table_mgr.h"
Expand Down Expand Up @@ -341,17 +342,17 @@ void ring_tap::send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
attr & XLIO_TX_PACKET_L4_CSUM);

std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);
int ret = send_buffer(p_send_wqe, attr);
xlio_send_attr send_attr = attr;
int ret = send_buffer(p_send_wqe, send_attr);
send_status_handler(ret, p_send_wqe);
}

int ring_tap::send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis)
xlio_send_attr &attr)
{
NOT_IN_USE(id);
NOT_IN_USE(tis);
compute_tx_checksum((mem_buf_desc_t *)(p_send_wqe->wr_id), attr & XLIO_TX_PACKET_L3_CSUM,
attr & XLIO_TX_PACKET_L4_CSUM);
compute_tx_checksum((mem_buf_desc_t *)(p_send_wqe->wr_id), attr.flags & XLIO_TX_PACKET_L3_CSUM,
attr.flags & XLIO_TX_PACKET_L4_CSUM);

std::lock_guard<decltype(m_lock_ring_tx)> lock(m_lock_ring_tx);
int ret = send_buffer(p_send_wqe, attr);
Expand Down Expand Up @@ -597,7 +598,7 @@ int ring_tap::mem_buf_tx_release(mem_buf_desc_t *buff_list, bool b_accounting, b
return count;
}

int ring_tap::send_buffer(xlio_ibv_send_wr *wr, xlio_wr_tx_packet_attr attr)
int ring_tap::send_buffer(xlio_ibv_send_wr *wr, xlio_send_attr &attr)
{
int ret = 0;
iovec iovec[wr->num_sge];
Expand Down
8 changes: 4 additions & 4 deletions src/core/dev/ring_tap.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ring_tap : public ring_slave {
virtual void send_ring_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr);
virtual int send_lwip_buffer(ring_user_id_t id, xlio_ibv_send_wr *p_send_wqe,
xlio_wr_tx_packet_attr attr, xlio_tis *tis);
xlio_send_attr &attr);
virtual void mem_buf_desc_return_single_to_owner_tx(mem_buf_desc_t *p_mem_buf_desc);
virtual void mem_buf_desc_return_single_multi_ref(mem_buf_desc_t *p_mem_buf_desc, unsigned ref);
virtual mem_buf_desc_t *mem_buf_tx_get(ring_user_id_t id, bool b_block, pbuf_type type,
Expand All @@ -85,7 +85,7 @@ class ring_tap : public ring_slave {
NOT_IN_USE(poll_sn);
return 0;
}
virtual void adapt_cq_moderation() {}
virtual void adapt_cq_moderation() { }

virtual int socketxtreme_poll(struct xlio_socketxtreme_completion_t *xlio_completions,
unsigned int ncompletions, int flags)
Expand All @@ -101,7 +101,7 @@ class ring_tap : public ring_slave {
NOT_IN_USE(rate_limit);
return 0;
}
void inc_cq_moderation_stats() {}
void inc_cq_moderation_stats() { }
virtual uint32_t get_tx_user_lkey(void *addr, size_t length)
{
NOT_IN_USE(addr);
Expand Down Expand Up @@ -134,7 +134,7 @@ class ring_tap : public ring_slave {
int prepare_flow_message(xlio_msg_flow &data, msg_flow_t flow_action);
int process_element_rx(void *pv_fd_ready_array);
bool request_more_rx_buffers();
int send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_wr_tx_packet_attr attr);
int send_buffer(xlio_ibv_send_wr *p_send_wqe, xlio_send_attr &attr);
void send_status_handler(int ret, xlio_ibv_send_wr *p_send_wqe);
void tap_create(net_device_val *p_ndev);
void tap_destroy();
Expand Down
Loading

0 comments on commit fa87745

Please sign in to comment.