@@ -1570,7 +1570,7 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
1570
1570
// Data received, advance the producer slot pointer
1571
1571
//
1572
1572
notify_produced = true;
1573
- qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_receive_cutthrough - %u octets written to use_slot=%u" ,
1573
+ qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_receive_cutthrough !KAG! - %u octets written to use_slot=%u" ,
1574
1574
qd_buffer_list_length (& content -> uct_slots [use_slot ]), use_slot );
1575
1575
sys_atomic_set (& content -> uct_produce_slot , (use_slot + 1 ) % UCT_SLOT_COUNT );
1576
1576
@@ -1887,16 +1887,21 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten
1887
1887
* q3_stalled = !IS_ATOMIC_FLAG_SET (& content -> aborted ) && (pn_session_outgoing_bytes (pns ) >= q3_upper );
1888
1888
while (!* q3_stalled && (sys_atomic_get (& content -> uct_consume_slot ) - sys_atomic_get (& content -> uct_produce_slot )) % UCT_SLOT_COUNT != 0 ) {
1889
1889
uint32_t use_slot = sys_atomic_get (& content -> uct_consume_slot );
1890
+ size_t kag_size = 0 ;
1890
1891
1891
1892
qd_buffer_t * buf = DEQ_HEAD (content -> uct_slots [use_slot ]);
1892
1893
while (!!buf ) {
1893
1894
DEQ_REMOVE_HEAD (content -> uct_slots [use_slot ]);
1894
1895
if (!IS_ATOMIC_FLAG_SET (& content -> aborted )) {
1895
- pn_link_send (pnl , (char * ) qd_buffer_base (buf ), qd_buffer_size (buf ));
1896
+ ssize_t sent = pn_link_send (pnl , (char * ) qd_buffer_base (buf ), qd_buffer_size (buf ));
1897
+ (void ) sent ;
1898
+ assert (sent == qd_buffer_size (buf ));
1899
+ kag_size += qd_buffer_size (buf );
1896
1900
}
1897
1901
qd_buffer_free (buf );
1898
1902
buf = DEQ_HEAD (content -> uct_slots [use_slot ]);
1899
1903
}
1904
+ qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_send_cut_through !KAG! use_slot=%" PRIu32 " consumed %zu octets" , use_slot , kag_size );
1900
1905
1901
1906
sys_atomic_set (& content -> uct_consume_slot , (use_slot + 1 ) % UCT_SLOT_COUNT );
1902
1907
notify_consumed = true;
@@ -3313,6 +3318,8 @@ void qd_message_produce_buffers(qd_message_t *stream, qd_buffer_list_t *buffers)
3313
3318
assert (qd_message_can_produce_buffers (stream ));
3314
3319
3315
3320
uint32_t useSlot = sys_atomic_get (& content -> uct_produce_slot );
3321
+ qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_produce_buffers !KAG! use_slot=%" PRIu32 " produced %u octets" ,
3322
+ useSlot , qd_buffer_list_length (buffers ));
3316
3323
DEQ_MOVE (* buffers , content -> uct_slots [useSlot ]);
3317
3324
sys_atomic_set (& content -> uct_produce_slot , (useSlot + 1 ) % UCT_SLOT_COUNT );
3318
3325
cutthrough_notify_buffers_produced_inbound (stream );
@@ -3332,14 +3339,15 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
3332
3339
3333
3340
while (count < limit && !empty ) {
3334
3341
uint32_t useSlot = sys_atomic_get (& content -> uct_consume_slot );
3335
- qd_log ( LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_consume_buffers useSlot=%" PRIu32 "" , useSlot ) ;
3342
+ size_t kag_total = 0 ;
3336
3343
while (count < limit && !DEQ_IS_EMPTY (content -> uct_slots [useSlot ])) {
3337
3344
qd_buffer_t * buf = DEQ_HEAD (content -> uct_slots [useSlot ]);
3338
- qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_consume_buffers buf size=%zu" , qd_buffer_size (buf ));
3339
3345
DEQ_REMOVE_HEAD (content -> uct_slots [useSlot ]);
3340
3346
DEQ_INSERT_TAIL (* buffers , buf );
3341
3347
count ++ ;
3348
+ kag_total += qd_buffer_size (buf );
3342
3349
}
3350
+ qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_consume_buffers !KAG! use_slot=%" PRIu32 " consumed %zu octets" , useSlot , kag_total );
3343
3351
if (DEQ_IS_EMPTY (content -> uct_slots [useSlot ])) {
3344
3352
notify_consumed = true;
3345
3353
sys_atomic_set (& content -> uct_consume_slot , (useSlot + 1 ) % UCT_SLOT_COUNT );
0 commit comments