From c75d4d3553c1398925d5ea9dbd73aa6e92a4e847 Mon Sep 17 00:00:00 2001 From: Ken Giusti Date: Mon, 29 Apr 2024 16:53:23 -0400 Subject: [PATCH] ISSUE-1480: add tcp window vanflow events (#1488) Note this does not resolve ISSUE #1480, it just fixes one aspect of it. --- src/adaptors/tcp/tcp_adaptor.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 65adf3ca7..f426e1577 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -1174,6 +1174,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect conn->common.vflow = vflow_start_record(VFLOW_RECORD_FLOW, connector->common.vflow); vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_SIZE, TCP_MAX_CAPACITY_BYTES); extract_metadata_from_stream_CSIDE(conn); @@ -1246,10 +1247,12 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn) if (octet_count > 0) { qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', octet_count); if (!was_blocked && window_full(conn) && !read_closed) { + uint64_t unacked = conn->inbound_octets - conn->window.last_update; conn->window.closed_count += 1; + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, unacked); + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_CLOSURES, conn->window.closed_count); qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64, - DLV_ARGS(conn->inbound_delivery), conn->inbound_octets, - (conn->inbound_octets - conn->window.last_update)); + DLV_ARGS(conn->inbound_delivery), conn->inbound_octets, unacked); } } @@ -1499,7 +1502,10 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn) qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets); if (!window_blocked && window_full(conn)) { + uint64_t unacked = conn->inbound_octets - conn->window.last_update; conn->window.closed_count += 1; + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, unacked); + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_CLOSURES, conn->window.closed_count); qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64, DLV_ARGS(conn->inbound_delivery), conn->inbound_octets, (conn->inbound_octets - conn->window.last_update)); @@ -1966,6 +1972,7 @@ static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listene conn->common.vflow = vflow_start_record(VFLOW_RECORD_FLOW, li->common.vflow); vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, 0); + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_WINDOW_SIZE, TCP_MAX_CAPACITY_BYTES); conn->context.context = conn; conn->context.handler = on_connection_event_LSIDE_IO; @@ -2161,10 +2168,10 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore); // Resend released will generate a PN_RECEIVED with section_offset == 0, ignore it. Ensure updates - // arrive in order, which may be possible if cut-through for disposition updates is implemented. + // arrive in order, which may not happen if cut-through for disposition updates is implemented. if (dstate && dstate->section_offset > 0 && (int64_t)(dstate->section_offset - conn->window.last_update) > 0) { - //vflow_set_uint64(tc->vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, tc->bytes_unacked); + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " PN_RECEIVED inbound_bytes=%" PRIu64 ", was_unacked=%" PRIu64 ", rcv_offset=%" PRIu64 " now_unacked=%" PRIu64, DLV_ARGS(dlv), conn->inbound_octets, @@ -2172,6 +2179,8 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di dstate->section_offset, (conn->inbound_octets - dstate->section_offset)); conn->window.last_update = dstate->section_offset; + vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS_UNACKED, conn->inbound_octets - dstate->section_offset); + qd_delivery_state_free(dstate); } }