@@ -1582,7 +1582,7 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
1582
1582
1583
1583
// Check for rx complete, error, or no data available:
1584
1584
1585
- if (rc < 0 || ! pn_delivery_partial ( delivery ) ) {
1585
+ if (rc < 0 ) {
1586
1586
if (pn_delivery_aborted (delivery )) {
1587
1587
qd_message_set_aborted (in_msg );
1588
1588
}
@@ -1892,7 +1892,9 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten
1892
1892
while (!!buf ) {
1893
1893
DEQ_REMOVE_HEAD (content -> uct_slots [use_slot ]);
1894
1894
if (!IS_ATOMIC_FLAG_SET (& content -> aborted )) {
1895
- pn_link_send (pnl , (char * ) qd_buffer_base (buf ), qd_buffer_size (buf ));
1895
+ ssize_t sent = pn_link_send (pnl , (char * ) qd_buffer_base (buf ), qd_buffer_size (buf ));
1896
+ (void ) sent ;
1897
+ assert (sent == qd_buffer_size (buf ));
1896
1898
}
1897
1899
qd_buffer_free (buf );
1898
1900
buf = DEQ_HEAD (content -> uct_slots [use_slot ]);
@@ -3332,10 +3334,8 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
3332
3334
3333
3335
while (count < limit && !empty ) {
3334
3336
uint32_t useSlot = sys_atomic_get (& content -> uct_consume_slot );
3335
- qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_consume_buffers useSlot=%" PRIu32 "" , useSlot );
3336
3337
while (count < limit && !DEQ_IS_EMPTY (content -> uct_slots [useSlot ])) {
3337
3338
qd_buffer_t * buf = DEQ_HEAD (content -> uct_slots [useSlot ]);
3338
- qd_log (LOG_MESSAGE , QD_LOG_DEBUG , "qd_message_consume_buffers buf size=%zu" , qd_buffer_size (buf ));
3339
3339
DEQ_REMOVE_HEAD (content -> uct_slots [useSlot ]);
3340
3340
DEQ_INSERT_TAIL (* buffers , buf );
3341
3341
count ++ ;
0 commit comments