Skip to content

Commit d6f0288

Browse files
committed
Checkpoint: draft. Has locking bug that needs fixin'
1 parent 5423375 commit d6f0288

File tree

9 files changed

+197
-170
lines changed

9 files changed

+197
-170
lines changed

include/qpid/dispatch/cutthrough_utils.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*
2828
* @param msg - Pointer to a stream
2929
*/
30-
void cutthrough_notify_buffers_produced_inbound(qd_message_t *msg);
31-
void cutthrough_notify_buffers_consumed_outbound(qd_message_t *msg);
30+
void cutthrough_notify_buffers_produced_inbound(const qd_message_activation_t *activation);
31+
void cutthrough_notify_buffers_consumed_outbound(const qd_message_activation_t *activation);
3232

3333
#endif

include/qpid/dispatch/message.h

+6-22
Original file line numberDiff line numberDiff line change
@@ -754,20 +754,6 @@ void qd_message_produce_buffers(qd_message_t *stream, qd_buffer_list_t *buffers)
754754
int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers, int limit);
755755

756756

757-
/**
758-
* Indicate whether this stream should be resumed from a stalled state. This will be the case
759-
* if (a) the stream was stalled due to being full, and (b) the payload has shrunk down below
760-
* the resume threshold.
761-
*
762-
* If the result is true, there is a side effect of clearing the 'stalled' state.
763-
*
764-
* @param stream Pointer to the message
765-
* @return true Yes, the stream was stalled and buffer production may continue
766-
* @return false No, the stream was not stalled or it was stalled and is not yet ready to resume
767-
*/
768-
bool qd_message_resume_from_stalled(qd_message_t *stream);
769-
770-
771757
typedef enum {
772758
QD_ACTIVATION_NONE = 0,
773759
QD_ACTIVATION_AMQP,
@@ -784,33 +770,31 @@ typedef struct {
784770
* Tell the message stream which connection is consuming its buffers.
785771
*
786772
* @param stream Pointer to the message
787-
* @param connection Pointer to the qd_connection that is consuming this stream's buffers
773+
* @param activation Parameters for activating the consuming I/O thread
788774
*/
789775
void qd_message_set_consumer_activation(qd_message_t *stream, qd_message_activation_t *activation);
790776

791777
/**
792-
* Return the connection that is consuming this message stream's buffers.
778+
* Cancel the activation. No further activations will be occur on return from this call.
793779
*
794780
* @param stream Pointer to the message
795-
* @return qd_connection_t* Pointer to the connection that is consuming buffers from this stream
796781
*/
797-
void qd_message_get_consumer_activation(const qd_message_t *stream, qd_message_activation_t *activation);
782+
void qd_message_cancel_consumer_activation(qd_message_t *stream);
798783

799784
/**
800785
* Tell the message stream which connection is producing its buffers.
801786
*
802787
* @param stream Pointer to the message
803-
* @param connection Pointer to the qd_connection that is consuming this stream's buffers
788+
* @param activation Parameters for activating the producing I/O thread
804789
*/
805790
void qd_message_set_producer_activation(qd_message_t *stream, qd_message_activation_t *activation);
806791

807792
/**
808-
* Return the connection that is producing this message stream's buffers.
793+
* Cancel the activation. No further activations will occur on return from this call.
809794
*
810795
* @param stream Pointer to the message
811-
* @return qd_connection_t* Pointer to the connection that is consuming buffers from this stream
812796
*/
813-
void qd_message_get_producer_activation(const qd_message_t *stream, qd_message_activation_t *activation);
797+
void qd_message_cancel_producer_activation(qd_message_t *stream);
814798

815799
///@}
816800

src/adaptors/tcp_lite/tcp_lite.c

+31-32
Original file line numberDiff line numberDiff line change
@@ -515,17 +515,14 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay)
515515

516516
free(conn->reply_to);
517517

518-
qd_message_activation_t activation;
519-
activation.type = QD_ACTIVATION_NONE;
520-
activation.delivery = 0;
521-
qd_nullify_safe_ptr(&activation.safeptr);
522-
523518
if (!!conn->inbound_stream) {
524-
qd_message_set_producer_activation(conn->inbound_stream, &activation);
519+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
520+
qd_message_cancel_producer_activation(conn->inbound_stream);
525521
}
526522

527523
if (!!conn->outbound_stream) {
528-
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
524+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel consuer activation", DLV_ARGS(conn->outbound_delivery));
525+
qd_message_cancel_consumer_activation(conn->outbound_stream);
529526
}
530527

531528
if (!!conn->inbound_delivery) {
@@ -890,6 +887,17 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli
890887
qdr_link_set_context(conn->inbound_link, conn);
891888
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.out", 0, false, delivery, &conn->outbound_link_id);
892889
qdr_link_set_context(conn->outbound_link, conn);
890+
891+
// now that the raw connection is up and able to be activated enable cutthrough activation
892+
893+
assert(conn->outbound_stream);
894+
qd_message_activation_t activation;
895+
activation.type = QD_ACTIVATION_TCP;
896+
activation.delivery = 0;
897+
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
898+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
899+
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
900+
qd_message_start_unicast_cutthrough(conn->outbound_stream);
893901
}
894902

895903

@@ -956,6 +964,7 @@ static bool try_compose_and_send_client_stream_LSIDE_IO(tcplite_connection_t *co
956964
activation.type = QD_ACTIVATION_TCP;
957965
activation.delivery = 0;
958966
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
967+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation", conn->conn_id, conn->inbound_link_id);
959968
qd_message_set_producer_activation(conn->inbound_stream, &activation);
960969
qd_message_start_unicast_cutthrough(conn->inbound_stream);
961970

@@ -1022,6 +1031,7 @@ static void compose_and_send_server_stream_CSIDE_IO(tcplite_connection_t *conn)
10221031
activation.type = QD_ACTIVATION_TCP;
10231032
activation.delivery = 0;
10241033
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
1034+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation", conn->conn_id, conn->inbound_link_id);
10251035
qd_message_set_producer_activation(conn->inbound_stream, &activation);
10261036
qd_message_start_unicast_cutthrough(conn->inbound_stream);
10271037

@@ -1092,6 +1102,7 @@ static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qd
10921102
activation.type = QD_ACTIVATION_TCP;
10931103
activation.delivery = 0;
10941104
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
1105+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
10951106
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
10961107
qd_message_start_unicast_cutthrough(conn->outbound_stream);
10971108
}
@@ -1196,16 +1207,6 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd
11961207
conn->raw_conn = pn_raw_connection();
11971208
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
11981209

1199-
//
1200-
// Message validation ensures the start of the message body is present. Activate cutthrough on the body data.
1201-
//
1202-
qd_message_activation_t activation;
1203-
activation.type = QD_ACTIVATION_TCP;
1204-
activation.delivery = 0;
1205-
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
1206-
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
1207-
qd_message_start_unicast_cutthrough(conn->outbound_stream);
1208-
12091210
//
12101211
// The raw connection establishment must be the last thing done in this function.
12111212
// After this call, a separate IO thread may immediately be invoked in the context
@@ -1289,9 +1290,11 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
12891290
if (pn_raw_connection_is_read_closed(conn->raw_conn) && !blocked) {
12901291
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Read-closed - close inbound delivery", conn->conn_id);
12911292
qd_message_set_receive_complete(conn->inbound_stream);
1293+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
1294+
qd_message_cancel_producer_activation(conn->inbound_stream);
12921295
qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false);
12931296
qdr_delivery_set_context(conn->inbound_delivery, 0);
1294-
qdr_delivery_decref(tcplite_context->core, conn->inbound_delivery, "TCP_LSIDE_IO - read-close");
1297+
qdr_delivery_decref(tcplite_context->core, conn->inbound_delivery, "FLOW_XSIDE_IO - inbound_delivery released");
12951298
conn->inbound_delivery = 0;
12961299
conn->inbound_stream = 0;
12971300
return true;
@@ -1360,8 +1363,12 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
13601363
if (qd_message_receive_complete(conn->outbound_stream) && !qd_message_can_consume_buffers(conn->outbound_stream)) {
13611364
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Rx-complete, rings empty: Write-closing the raw connection", conn->conn_id);
13621365
pn_raw_connection_write_close(conn->raw_conn);
1366+
qd_message_set_send_complete(conn->outbound_stream);
1367+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel consumer activation", DLV_ARGS(conn->outbound_delivery));
1368+
qd_message_cancel_consumer_activation(conn->outbound_stream);
13631369
qdr_delivery_set_context(conn->outbound_delivery, 0);
13641370
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery, PN_ACCEPTED, true, 0, true); // accepted, settled, ref_given
1371+
// do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
13651372
conn->outbound_delivery = 0;
13661373
conn->outbound_stream = 0;
13671374
} else {
@@ -1532,15 +1539,11 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
15321539
//
15331540
bool ignore;
15341541
if (qd_tls_is_input_drained(conn->tls, &ignore) && conn->inbound_stream) {
1535-
qd_message_activation_t activation;
1536-
activation.type = QD_ACTIVATION_NONE;
1537-
activation.delivery = 0;
1538-
qd_nullify_safe_ptr(&activation.safeptr);
1539-
15401542
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS inbound stream receive complete", DLV_ARGS(conn->inbound_delivery));
15411543

15421544
qd_message_set_receive_complete(conn->inbound_stream);
1543-
qd_message_set_producer_activation(conn->inbound_stream, &activation);
1545+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
1546+
qd_message_cancel_producer_activation(conn->inbound_stream);
15441547

15451548
qdr_delivery_set_context(conn->inbound_delivery, 0);
15461549
qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false);
@@ -1553,21 +1556,17 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
15531556
// Check for end of outbound message data
15541557
//
15551558
if (qd_tls_is_output_flushed(conn->tls) && conn->outbound_stream) {
1556-
qd_message_activation_t activation;
1557-
activation.type = QD_ACTIVATION_NONE;
1558-
activation.delivery = 0;
1559-
qd_nullify_safe_ptr(&activation.safeptr);
1560-
15611559
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TLS outbound stream send complete", DLV_ARGS(conn->outbound_delivery));
15621560

15631561
qd_message_set_send_complete(conn->outbound_stream);
1564-
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
1562+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel consumer activation", DLV_ARGS(conn->outbound_delivery));
1563+
qd_message_cancel_consumer_activation(conn->outbound_stream);
15651564

15661565
qdr_delivery_set_context(conn->outbound_delivery, 0);
15671566
qdr_delivery_remote_state_updated(tcplite_context->core, conn->outbound_delivery,
15681567
tls_status < 0 ? PN_MODIFIED : PN_ACCEPTED,
1569-
true, 0, false); // settled, 0, ref_given
1570-
qdr_delivery_decref(tcplite_context->core, conn->outbound_delivery, "TLS_FLOW_XSIDE_IO - outbound_delivery released");
1568+
true, 0, true); // settled, 0, ref_given
1569+
// do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
15711570
conn->outbound_delivery = 0;
15721571
conn->outbound_stream = 0;
15731572
}

src/cutthrough_utils.c

+5-12
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include "adaptors/tcp_lite/tcp_lite.h"
2626

2727

28-
static void activate_connection(qd_message_activation_t *activation, qd_direction_t dir)
28+
static void activate_connection(const qd_message_activation_t *activation, qd_direction_t dir)
2929
{
3030
switch (activation->type) {
3131
case QD_ACTIVATION_NONE:
@@ -77,20 +77,13 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio
7777
}
7878

7979

80-
void cutthrough_notify_buffers_produced_inbound(qd_message_t *msg)
80+
void cutthrough_notify_buffers_produced_inbound(const qd_message_activation_t *activation)
8181
{
82-
qd_message_activation_t activation;
83-
qd_message_get_consumer_activation(msg, &activation);
84-
activate_connection(&activation, QD_OUTGOING);
82+
activate_connection(activation, QD_OUTGOING);
8583
}
8684

8785

88-
void cutthrough_notify_buffers_consumed_outbound(qd_message_t *msg)
86+
void cutthrough_notify_buffers_consumed_outbound(const qd_message_activation_t *activation)
8987
{
90-
bool unstall = qd_message_resume_from_stalled(msg);
91-
if (unstall) {
92-
qd_message_activation_t activation;
93-
qd_message_get_producer_activation(msg, &activation);
94-
activate_connection(&activation, QD_INCOMING);
95-
}
88+
activate_connection(activation, QD_INCOMING);
9689
}

src/message.c

+42-30
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,31 @@ bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)
15281528
return false;
15291529
}
15301530

1531+
static inline void activate_message_consumer(qd_message_t *stream)
1532+
{
1533+
qd_message_content_t *content = MSG_CONTENT(stream);
1534+
LOCK(&content->lock);
1535+
if (content->uct_consumer_activation.type != QD_ACTIVATION_NONE) {
1536+
cutthrough_notify_buffers_produced_inbound(&content->uct_consumer_activation);
1537+
}
1538+
UNLOCK(&content->lock);
1539+
}
1540+
1541+
1542+
static inline void activate_message_producer(qd_message_t *stream)
1543+
{
1544+
qd_message_content_t *content = MSG_CONTENT(stream);
1545+
1546+
uint32_t full_slots = (sys_atomic_get(&content->uct_produce_slot) - sys_atomic_get(&content->uct_consume_slot)) % UCT_SLOT_COUNT;
1547+
if (full_slots < UCT_RESUME_THRESHOLD) {
1548+
LOCK(&content->lock);
1549+
if (content->uct_producer_activation.type != QD_ACTIVATION_NONE) {
1550+
cutthrough_notify_buffers_consumed_outbound(&content->uct_producer_activation);
1551+
}
1552+
UNLOCK(&content->lock);
1553+
}
1554+
}
1555+
15311556

15321557
// Read incoming data from a pn_link_t and store it into a buffer list. Limit buffer list length to a maximum of
15331558
// limit. Helper routine for qd_message_receive_cutthrough()
@@ -1576,7 +1601,6 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
15761601

15771602
if ((sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT == 1) {
15781603
stalled = true;
1579-
SET_ATOMIC_FLAG(&content->uct_producer_stalled);
15801604
}
15811605
}
15821606

@@ -1595,7 +1619,7 @@ static void qd_message_receive_cutthrough(qd_message_t *in_msg, pn_delivery_t *d
15951619
}
15961620

15971621
if (notify_produced) {
1598-
cutthrough_notify_buffers_produced_inbound(in_msg);
1622+
activate_message_consumer(in_msg);
15991623
}
16001624
}
16011625

@@ -1912,10 +1936,11 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten
19121936
// all of the buffered content. Mark the message as send-complete.
19131937
//
19141938
SET_ATOMIC_FLAG(&msg->send_complete);
1939+
notify_consumed = false; // no need to restart producer - it is done
19151940
}
19161941

19171942
if (notify_consumed) {
1918-
cutthrough_notify_buffers_consumed_outbound((qd_message_t *) msg);
1943+
activate_message_producer((qd_message_t *) msg);
19191944
}
19201945
}
19211946

@@ -3317,11 +3342,7 @@ void qd_message_produce_buffers(qd_message_t *stream, qd_buffer_list_t *buffers)
33173342
uint32_t useSlot = sys_atomic_get(&content->uct_produce_slot);
33183343
DEQ_MOVE(*buffers, content->uct_slots[useSlot]);
33193344
sys_atomic_set(&content->uct_produce_slot, (useSlot + 1) % UCT_SLOT_COUNT);
3320-
cutthrough_notify_buffers_produced_inbound(stream);
3321-
3322-
if ((sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT == 1) {
3323-
SET_ATOMIC_FLAG(&content->uct_producer_stalled);
3324-
}
3345+
activate_message_consumer(stream);
33253346
}
33263347

33273348

@@ -3348,50 +3369,41 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
33483369
}
33493370

33503371
if (notify_consumed) {
3351-
cutthrough_notify_buffers_consumed_outbound(stream);
3372+
activate_message_producer(stream);
33523373
}
33533374

33543375
return count;
33553376
}
33563377

3357-
3358-
bool qd_message_resume_from_stalled(qd_message_t *stream)
3359-
{
3360-
qd_message_content_t *content = MSG_CONTENT(stream);
3361-
3362-
if (IS_ATOMIC_FLAG_SET(&content->uct_producer_stalled)
3363-
&& (sys_atomic_get(&content->uct_produce_slot) - sys_atomic_get(&content->uct_consume_slot)) % UCT_SLOT_COUNT < UCT_RESUME_THRESHOLD) {
3364-
CLEAR_ATOMIC_FLAG(&content->uct_producer_stalled);
3365-
return true;
3366-
}
3367-
3368-
return false;
3369-
}
3370-
3371-
33723378
void qd_message_set_consumer_activation(qd_message_t *stream, qd_message_activation_t *activation)
33733379
{
33743380
qd_message_content_t *content = MSG_CONTENT(stream);
3381+
LOCK(&content->lock);
33753382
content->uct_consumer_activation = *activation;
3383+
UNLOCK(&content->lock);
33763384
}
33773385

33783386

3379-
void qd_message_get_consumer_activation(const qd_message_t *stream, qd_message_activation_t *activation)
3387+
void qd_message_cancel_consumer_activation(qd_message_t *stream)
33803388
{
33813389
qd_message_content_t *content = MSG_CONTENT(stream);
3382-
*activation = content->uct_consumer_activation;
3390+
LOCK(&content->lock);
3391+
content->uct_consumer_activation.type = QD_ACTIVATION_NONE;
3392+
UNLOCK(&content->lock);
33833393
}
33843394

3385-
33863395
void qd_message_set_producer_activation(qd_message_t *stream, qd_message_activation_t *activation)
33873396
{
33883397
qd_message_content_t *content = MSG_CONTENT(stream);
3398+
LOCK(&content->lock);
33893399
content->uct_producer_activation = *activation;
3400+
UNLOCK(&content->lock);
33903401
}
33913402

3392-
3393-
void qd_message_get_producer_activation(const qd_message_t *stream, qd_message_activation_t *activation)
3403+
void qd_message_cancel_producer_activation(qd_message_t *stream)
33943404
{
33953405
qd_message_content_t *content = MSG_CONTENT(stream);
3396-
*activation = content->uct_producer_activation;
3406+
LOCK(&content->lock);
3407+
content->uct_producer_activation.type = QD_ACTIVATION_NONE;
3408+
UNLOCK(&content->lock);
33973409
}

0 commit comments

Comments
 (0)