@@ -539,17 +539,14 @@ static void close_connection_XSIDE_IO(tcplite_connection_t *conn, bool no_delay)
539
539
540
540
free (conn -> reply_to );
541
541
542
- qd_message_activation_t activation ;
543
- activation .type = QD_ACTIVATION_NONE ;
544
- activation .delivery = 0 ;
545
- qd_nullify_safe_ptr (& activation .safeptr );
546
-
547
542
if (!!conn -> inbound_stream ) {
548
- qd_message_set_producer_activation (conn -> inbound_stream , & activation );
543
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TCP cancel producer activation" , DLV_ARGS (conn -> inbound_delivery ));
544
+ qd_message_cancel_producer_activation (conn -> inbound_stream );
549
545
}
550
546
551
547
if (!!conn -> outbound_stream ) {
552
- qd_message_set_consumer_activation (conn -> outbound_stream , & activation );
548
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TCP cancel consumer activation" , DLV_ARGS (conn -> outbound_delivery ));
549
+ qd_message_cancel_consumer_activation (conn -> outbound_stream );
553
550
}
554
551
555
552
if (!!conn -> inbound_delivery ) {
@@ -917,6 +914,17 @@ static void link_setup_CSIDE_IO(tcplite_connection_t *conn, qdr_delivery_t *deli
917
914
qdr_link_set_context (conn -> inbound_link , conn );
918
915
conn -> outbound_link = qdr_link_first_attach (conn -> core_conn , QD_OUTGOING , qdr_terminus (0 ), qdr_terminus (0 ), "tcp.cside.out" , 0 , false, delivery , & conn -> outbound_link_id );
919
916
qdr_link_set_context (conn -> outbound_link , conn );
917
+
918
+ // now that the raw connection is up and able to be activated enable cutthrough activation
919
+
920
+ assert (conn -> outbound_stream );
921
+ qd_message_activation_t activation ;
922
+ activation .type = QD_ACTIVATION_TCP ;
923
+ activation .delivery = 0 ;
924
+ qd_alloc_set_safe_ptr (& activation .safeptr , conn );
925
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TCP enabling consumer activation" , DLV_ARGS (delivery ));
926
+ qd_message_set_consumer_activation (conn -> outbound_stream , & activation );
927
+ qd_message_start_unicast_cutthrough (conn -> outbound_stream );
920
928
}
921
929
922
930
@@ -983,6 +991,7 @@ static bool try_compose_and_send_client_stream_LSIDE_IO(tcplite_connection_t *co
983
991
activation .type = QD_ACTIVATION_TCP ;
984
992
activation .delivery = 0 ;
985
993
qd_alloc_set_safe_ptr (& activation .safeptr , conn );
994
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation" , conn -> conn_id , conn -> inbound_link_id );
986
995
qd_message_set_producer_activation (conn -> inbound_stream , & activation );
987
996
qd_message_start_unicast_cutthrough (conn -> inbound_stream );
988
997
@@ -1049,6 +1058,7 @@ static void compose_and_send_server_stream_CSIDE_IO(tcplite_connection_t *conn)
1049
1058
activation .type = QD_ACTIVATION_TCP ;
1050
1059
activation .delivery = 0 ;
1051
1060
qd_alloc_set_safe_ptr (& activation .safeptr , conn );
1061
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation" , conn -> conn_id , conn -> inbound_link_id );
1052
1062
qd_message_set_producer_activation (conn -> inbound_stream , & activation );
1053
1063
qd_message_start_unicast_cutthrough (conn -> inbound_stream );
1054
1064
@@ -1119,6 +1129,7 @@ static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qd
1119
1129
activation .type = QD_ACTIVATION_TCP ;
1120
1130
activation .delivery = 0 ;
1121
1131
qd_alloc_set_safe_ptr (& activation .safeptr , conn );
1132
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TCP enabling consumer activation" , DLV_ARGS (delivery ));
1122
1133
qd_message_set_consumer_activation (conn -> outbound_stream , & activation );
1123
1134
qd_message_start_unicast_cutthrough (conn -> outbound_stream );
1124
1135
}
@@ -1184,16 +1195,6 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd
1184
1195
conn -> raw_conn = pn_raw_connection ();
1185
1196
pn_raw_connection_set_context (conn -> raw_conn , & conn -> context );
1186
1197
1187
- //
1188
- // Message validation ensures the start of the message body is present. Activate cutthrough on the body data.
1189
- //
1190
- qd_message_activation_t activation ;
1191
- activation .type = QD_ACTIVATION_TCP ;
1192
- activation .delivery = 0 ;
1193
- qd_alloc_set_safe_ptr (& activation .safeptr , conn );
1194
- qd_message_set_consumer_activation (conn -> outbound_stream , & activation );
1195
- qd_message_start_unicast_cutthrough (conn -> outbound_stream );
1196
-
1197
1198
//
1198
1199
// The raw connection establishment must be the last thing done in this function.
1199
1200
// After this call, a separate IO thread may immediately be invoked in the context
@@ -1274,12 +1275,16 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
1274
1275
// If the raw connection is read-closed and the last produce did not block, settle and complete
1275
1276
// the inbound stream/delivery and close out the inbound half of the connection.
1276
1277
//
1278
+
1277
1279
if (read_closed ) {
1278
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " Raw conn read-closed - close inbound delivery" , DLV_ARGS (conn -> inbound_delivery ));
1280
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG ,
1281
+ DLV_FMT " Raw conn read-closed - close inbound delivery, cancel producer activation" ,
1282
+ DLV_ARGS (conn -> inbound_delivery ));
1279
1283
qd_message_set_receive_complete (conn -> inbound_stream );
1284
+ qd_message_cancel_producer_activation (conn -> inbound_stream );
1280
1285
qdr_delivery_continue (tcplite_context -> core , conn -> inbound_delivery , false);
1281
1286
qdr_delivery_set_context (conn -> inbound_delivery , 0 );
1282
- qdr_delivery_decref (tcplite_context -> core , conn -> inbound_delivery , "TCP_LSIDE_IO - read-close " );
1287
+ qdr_delivery_decref (tcplite_context -> core , conn -> inbound_delivery , "FLOW_XSIDE_IO - inbound_delivery released " );
1283
1288
conn -> inbound_delivery = 0 ;
1284
1289
conn -> inbound_stream = 0 ;
1285
1290
return true;
@@ -1346,10 +1351,14 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
1346
1351
// payload has been consumed and written before write-closing the connection.
1347
1352
//
1348
1353
if (qd_message_receive_complete (conn -> outbound_stream ) && !qd_message_can_consume_buffers (conn -> outbound_stream )) {
1349
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] Rx-complete, rings empty: Write-closing the raw connection" , conn -> conn_id );
1354
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " Rx-complete, rings empty: Write-closing the raw connection, consumer activation cancelled" ,
1355
+ DLV_ARGS (conn -> outbound_delivery ));
1350
1356
pn_raw_connection_write_close (conn -> raw_conn );
1357
+ qd_message_set_send_complete (conn -> outbound_stream );
1358
+ qd_message_cancel_consumer_activation (conn -> outbound_stream );
1351
1359
qdr_delivery_set_context (conn -> outbound_delivery , 0 );
1352
1360
qdr_delivery_remote_state_updated (tcplite_context -> core , conn -> outbound_delivery , PN_ACCEPTED , true, 0 , true); // accepted, settled, ref_given
1361
+ // do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
1353
1362
conn -> outbound_delivery = 0 ;
1354
1363
conn -> outbound_stream = 0 ;
1355
1364
} else {
@@ -1520,15 +1529,10 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
1520
1529
//
1521
1530
bool ignore ;
1522
1531
if (qd_tls_is_input_drained (conn -> tls , & ignore ) && conn -> inbound_stream ) {
1523
- qd_message_activation_t activation ;
1524
- activation .type = QD_ACTIVATION_NONE ;
1525
- activation .delivery = 0 ;
1526
- qd_nullify_safe_ptr (& activation .safeptr );
1527
-
1528
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TLS inbound stream receive complete" , DLV_ARGS (conn -> inbound_delivery ));
1532
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TLS inbound stream receive complete, producer activation cancelled" , DLV_ARGS (conn -> inbound_delivery ));
1529
1533
1530
1534
qd_message_set_receive_complete (conn -> inbound_stream );
1531
- qd_message_set_producer_activation (conn -> inbound_stream , & activation );
1535
+ qd_message_cancel_producer_activation (conn -> inbound_stream );
1532
1536
1533
1537
qdr_delivery_set_context (conn -> inbound_delivery , 0 );
1534
1538
qdr_delivery_continue (tcplite_context -> core , conn -> inbound_delivery , false);
@@ -1541,21 +1545,16 @@ static bool manage_tls_flow_XSIDE_IO(tcplite_connection_t *conn)
1541
1545
// Check for end of outbound message data
1542
1546
//
1543
1547
if (qd_tls_is_output_flushed (conn -> tls ) && conn -> outbound_stream ) {
1544
- qd_message_activation_t activation ;
1545
- activation .type = QD_ACTIVATION_NONE ;
1546
- activation .delivery = 0 ;
1547
- qd_nullify_safe_ptr (& activation .safeptr );
1548
-
1549
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TLS outbound stream send complete" , DLV_ARGS (conn -> outbound_delivery ));
1548
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , DLV_FMT " TLS outbound stream send complete, consumer activation cancelled" , DLV_ARGS (conn -> outbound_delivery ));
1550
1549
1551
1550
qd_message_set_send_complete (conn -> outbound_stream );
1552
- qd_message_set_consumer_activation (conn -> outbound_stream , & activation );
1551
+ qd_message_cancel_consumer_activation (conn -> outbound_stream );
1553
1552
1554
1553
qdr_delivery_set_context (conn -> outbound_delivery , 0 );
1555
1554
qdr_delivery_remote_state_updated (tcplite_context -> core , conn -> outbound_delivery ,
1556
1555
tls_status < 0 ? PN_MODIFIED : PN_ACCEPTED ,
1557
- true, 0 , false ); // settled, 0, ref_given
1558
- qdr_delivery_decref ( tcplite_context -> core , conn -> outbound_delivery , "TLS_FLOW_XSIDE_IO - outbound_delivery released" );
1556
+ true, 0 , true ); // settled, 0, ref_given
1557
+ // do NOT decref outbound_delivery - ref count passed to qdr_delivery_remote_state_updated()!
1559
1558
conn -> outbound_delivery = 0 ;
1560
1559
conn -> outbound_stream = 0 ;
1561
1560
}
0 commit comments