diff --git a/src/core/dev/cq_mgr_tx.cpp b/src/core/dev/cq_mgr_tx.cpp index b3ea63367..ef290810a 100644 --- a/src/core/dev/cq_mgr_tx.cpp +++ b/src/core/dev/cq_mgr_tx.cpp @@ -214,6 +214,8 @@ int cq_mgr_tx::poll_and_process_element_tx(uint64_t *p_cq_poll_sn) if (unlikely(cqe->op_own & 0x80) && is_error_opcode(cqe->op_own >> 4)) { // m_p_cq_stat->n_tx_cqe_error++; Future counter log_cqe_error(cqe); + + m_hqtx_ptr->m_sq_wqe_idx_to_prop[index].buf->m_flags |= mem_buf_desc_t::INVALID; } handle_sq_wqe_prop(index); diff --git a/src/core/proto/mem_buf_desc.h b/src/core/proto/mem_buf_desc.h index 326e28eb9..aa5c3ac8d 100644 --- a/src/core/proto/mem_buf_desc.h +++ b/src/core/proto/mem_buf_desc.h @@ -69,7 +69,12 @@ struct timestamps_t { */ class mem_buf_desc_t { public: - enum flags { TYPICAL = 0, CLONED = 0x01, ZCOPY = 0x02 }; + enum flags { + TYPICAL = 0, + CLONED = 0x01, + ZCOPY = 0x02, + INVALID = 0x04, + }; public: mem_buf_desc_t(uint8_t *buffer, size_t size, pbuf_type type) diff --git a/src/core/sock/sockinfo_tcp.cpp b/src/core/sock/sockinfo_tcp.cpp index 503779df2..cb9e9bc12 100644 --- a/src/core/sock/sockinfo_tcp.cpp +++ b/src/core/sock/sockinfo_tcp.cpp @@ -1347,6 +1347,21 @@ ssize_t sockinfo_tcp::tcp_tx_slow_path(xlio_tx_call_attr_t &tx_arg) return tcp_tx_handle_done_and_unlock(total_tx, errno_tmp, is_dummy, is_send_zerocopy); } +static bool is_socket_in_error_state(const struct pbuf *p, struct tcp_pcb *pcb, uint16_t flags) +{ + if (unlikely(flags & XLIO_TX_PACKET_REXMIT)) { + // this means a retransmit happened - let's check the error_state we'll put if we got error + // cqe + const mem_buf_desc_t *mem_buf_desc = (const mem_buf_desc_t *)p; + if (mem_buf_desc->m_flags & mem_buf_desc_t::INVALID) { + TCP_EVENT_ERR(pcb->errf, pcb->my_container, ERR_RST); + return true; + } + } + + return false; +} + /* * TODO Remove 'p' from the interface and use 'seg'. * There are multiple places where ip_output() is used without allocating @@ -1372,6 +1387,10 @@ err_t sockinfo_tcp::ip_output(struct pbuf *p, struct tcp_seg *seg, void *v_p_con int count = 0; void *cur_end; + if (unlikely(is_socket_in_error_state(p, (struct tcp_pcb *)v_p_conn, flags))) { + return ERR_RST; + } + int rc = p_si_tcp->m_ops->postrouting(p, seg, attr); if (rc != 0) { return rc; @@ -1582,7 +1601,8 @@ void sockinfo_tcp::err_lwip_cb(void *pcb_container, err_t err) } else { NOTIFY_ON_EVENTS(conn, (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)); } - /* TODO what about no route to host type of errors, need to add EPOLLERR in this case ? + /* TODO what about no route to host type of errors, need to add EPOLLERR in this + * case ? */ } else { // ERR_TIMEOUT NOTIFY_ON_EVENTS(conn, (EPOLLIN | EPOLLHUP)); @@ -1590,8 +1610,9 @@ void sockinfo_tcp::err_lwip_cb(void *pcb_container, err_t err) /* SOCKETXTREME comment: * Add this fd to the ready fd list - * Note: No issue is expected in case socketxtreme_poll() usage because 'pv_fd_ready_array' - * is null in such case and as a result update_fd_array() call means nothing + * Note: No issue is expected in case socketxtreme_poll() usage because + * 'pv_fd_ready_array' is null in such case and as a result update_fd_array() call means + * nothing */ io_mux_call::update_fd_array(conn->m_iomux_ready_fd_array, conn->m_fd); @@ -1630,7 +1651,8 @@ bool sockinfo_tcp::process_peer_ctl_packets(xlio_desc_list_t &peer_packets) return false; } - // Listen socket is 3T and so rx.src/dst are set as part of rx_process_buffer_no_flow_id. + // Listen socket is 3T and so rx.src/dst are set as part of + // rx_process_buffer_no_flow_id. struct tcp_pcb *pcb = get_syn_received_pcb(desc->rx.src, desc->rx.dst); // 2.1.2 get the pcb and sockinfo @@ -1743,8 +1765,8 @@ void sockinfo_tcp::process_my_ctl_packets() } // prepare for next map iteration if (peer_packets.empty()) { - m_rx_peer_packets.erase(itr++); // // advance itr before invalidating it by erase (itr++ - // returns the value before advance) + m_rx_peer_packets.erase(itr++); // // advance itr before invalidating it by erase + // (itr++ returns the value before advance) } else { ++itr; } @@ -1879,9 +1901,9 @@ void sockinfo_tcp::handle_incoming_handshake_failure(sockinfo_tcp *child_conn) // we finish with the current processing. XLIO_CALL(close, child_conn->get_fd()); } else { - // With delegate mode calling close() will destroy the socket object and cause access after - // free in the subsequent flows. Instead, we add the socket for postponed close that will be - // as part of the delegate timer. + // With delegate mode calling close() will destroy the socket object and cause access + // after free in the subsequent flows. Instead, we add the socket for postponed close + // that will be as part of the delegate timer. g_event_handler_manager_local.add_close_postponed_socket(child_conn); } } @@ -5112,7 +5134,7 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool blocking) m_rx_ring_map_lock.unlock(); lock_tcp_con(); // We must take a lock before checking m_n_rx_pkt_ready_list_count - if (likely(m_n_rx_pkt_ready_list_count)) { // got completions from CQ + if (likely(m_n_rx_pkt_ready_list_count)) { // Got completions from CQ __log_entry_funcall("Ready %d packets. sn=%llu", m_n_rx_pkt_ready_list_count, (unsigned long long)poll_sn); IF_STATS(m_p_socket_stats->counters.n_rx_poll_hit++); @@ -5125,9 +5147,9 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool blocking) unlock_tcp_con(); // Must happen before g_event_handler_manager_local.do_tasks(); if (safe_mce_sys().tcp_ctl_thread == option_tcp_ctl_thread::CTL_THREAD_DELEGATE_TCP_TIMERS) { - // There are scenarios when rx_wait_helper is called in an infinite loop but exits before - // OS epoll_wait. Delegated TCP timers must be attempted in such case. - // This is a slow path. So calling chrono::now(), even with every iteration, is OK here. + // There are scenarios when rx_wait_helper is called in an infinite loop but exits + // before OS epoll_wait. Delegated TCP timers must be attempted in such case. This is a + // slow path. So calling chrono::now(), even with every iteration, is OK here. g_event_handler_manager_local.do_tasks(); } @@ -6047,7 +6069,8 @@ void tcp_timers_collection::remove_timer(sockinfo_tcp *sock) __log_dbg("TCP socket [%p] timer was removed", sock); } else { // Listen sockets are not added to timers. - // As part of socket general unregister and destroy they will get here and will no be found. + // As part of socket general unregister and destroy they will get here and will no be + // found. __log_dbg("TCP socket [%p] timer was not found (listen socket)", sock); } }