@@ -445,7 +445,9 @@ static void free_connection_IO(void *context)
445
445
tcplite_connection_t * conn = (tcplite_connection_t * ) context ;
446
446
qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] Cleaning up resources" , conn -> conn_id );
447
447
448
- // disable activation via Core thread
448
+ // Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
449
+ // attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
450
+ // take place.
449
451
sys_mutex_lock (& conn -> activation_lock );
450
452
CLEAR_ATOMIC_FLAG (& conn -> raw_opened );
451
453
sys_mutex_unlock (& conn -> activation_lock );
@@ -1208,9 +1210,7 @@ static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qd
1208
1210
// The raw connection establishment must be the last thing done in this function.
1209
1211
// After this call, a separate IO thread may immediately be invoked in the context
1210
1212
// of the new connection to handle raw connection events.
1211
- // ISSUE-1202 - Set the conn->raw_opened flag before calling pn_proactor_raw_connect()
1212
1213
//
1213
- SET_ATOMIC_FLAG (& conn -> raw_opened );
1214
1214
pn_proactor_raw_connect (tcplite_context -> proactor , conn -> raw_conn , cr -> adaptor_config -> host_port );
1215
1215
1216
1216
return QD_DELIVERY_MOVED_TO_NEW_LINK ;
@@ -1586,14 +1586,15 @@ static void connection_run_LSIDE_IO(tcplite_connection_t *conn)
1586
1586
1587
1587
switch (conn -> state ) {
1588
1588
case LSIDE_INITIAL :
1589
- // raw connection is active
1590
- if (conn -> tls ) {
1591
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] LSIDE_IO performing TLS handshake" , conn -> conn_id );
1592
- set_state_XSIDE_IO (conn , LSIDE_TLS_HANDSHAKE );
1593
- repeat = true;
1594
- } else {
1595
- link_setup_LSIDE_IO (conn );
1596
- set_state_XSIDE_IO (conn , LSIDE_LINK_SETUP );
1589
+ if (IS_ATOMIC_FLAG_SET (& conn -> raw_opened )) { // raw connection is active
1590
+ if (conn -> tls ) {
1591
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] LSIDE_IO performing TLS handshake" , conn -> conn_id );
1592
+ set_state_XSIDE_IO (conn , LSIDE_TLS_HANDSHAKE );
1593
+ repeat = true;
1594
+ } else {
1595
+ link_setup_LSIDE_IO (conn );
1596
+ set_state_XSIDE_IO (conn , LSIDE_LINK_SETUP );
1597
+ }
1597
1598
}
1598
1599
break ;
1599
1600
@@ -1685,9 +1686,10 @@ static void connection_run_CSIDE_IO(tcplite_connection_t *conn)
1685
1686
1686
1687
switch (conn -> state ) {
1687
1688
case CSIDE_INITIAL :
1688
- // raw connection is active
1689
- link_setup_CSIDE_IO (conn , conn -> outbound_delivery );
1690
- set_state_XSIDE_IO (conn , CSIDE_LINK_SETUP );
1689
+ if (IS_ATOMIC_FLAG_SET (& conn -> raw_opened )) { // raw connection is active
1690
+ link_setup_CSIDE_IO (conn , conn -> outbound_delivery );
1691
+ set_state_XSIDE_IO (conn , CSIDE_LINK_SETUP );
1692
+ }
1691
1693
break ;
1692
1694
1693
1695
case CSIDE_LINK_SETUP :
@@ -1835,10 +1837,15 @@ static char *get_tls_negotiated_alpn(qd_message_t *msg)
1835
1837
static void on_connection_event_LSIDE_IO (pn_event_t * e , qd_server_t * qd_server , void * context )
1836
1838
{
1837
1839
SET_THREAD_RAW_IO ;
1838
- tcplite_connection_t * conn = (tcplite_connection_t * ) context ;
1839
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] on_connection_event_LSIDE_IO: %s" , conn -> conn_id , pn_event_type_name (pn_event_type (e )));
1840
-
1841
- if (pn_event_type (e ) == PN_RAW_CONNECTION_DISCONNECTED ) {
1840
+ pn_event_type_t etype = pn_event_type (e );
1841
+ tcplite_connection_t * conn = (tcplite_connection_t * ) context ;
1842
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] on_connection_event_LSIDE_IO: %s" , conn -> conn_id , pn_event_type_name (etype ));
1843
+
1844
+ if (etype == PN_RAW_CONNECTION_CONNECTED ) {
1845
+ // it is safe to call pn_raw_connection_wake() now
1846
+ assert (!IS_ATOMIC_FLAG_SET (& conn -> raw_opened ));
1847
+ SET_ATOMIC_FLAG (& conn -> raw_opened );
1848
+ } else if (etype == PN_RAW_CONNECTION_DISCONNECTED ) {
1842
1849
conn -> error = !!conn -> raw_conn ? pn_raw_connection_condition (conn -> raw_conn ) : 0 ;
1843
1850
1844
1851
if (!!conn -> error ) {
@@ -1867,10 +1874,15 @@ static void on_connection_event_LSIDE_IO(pn_event_t *e, qd_server_t *qd_server,
1867
1874
static void on_connection_event_CSIDE_IO (pn_event_t * e , qd_server_t * qd_server , void * context )
1868
1875
{
1869
1876
SET_THREAD_RAW_IO ;
1870
- tcplite_connection_t * conn = (tcplite_connection_t * ) context ;
1871
- qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] on_connection_event_CSIDE_IO: %s" , conn -> conn_id , pn_event_type_name (pn_event_type (e )));
1872
-
1873
- if (pn_event_type (e ) == PN_RAW_CONNECTION_DISCONNECTED ) {
1877
+ pn_event_type_t etype = pn_event_type (e );
1878
+ tcplite_connection_t * conn = (tcplite_connection_t * ) context ;
1879
+ qd_log (LOG_TCP_ADAPTOR , QD_LOG_DEBUG , "[C%" PRIu64 "] on_connection_event_CSIDE_IO: %s" , conn -> conn_id , pn_event_type_name (etype ));
1880
+
1881
+ if (etype == PN_RAW_CONNECTION_CONNECTED ) {
1882
+ // it is safe to call pn_raw_connection_wake() now
1883
+ assert (!IS_ATOMIC_FLAG_SET (& conn -> raw_opened ));
1884
+ SET_ATOMIC_FLAG (& conn -> raw_opened );
1885
+ } else if (etype == PN_RAW_CONNECTION_DISCONNECTED ) {
1874
1886
conn -> error = !!conn -> raw_conn ? pn_raw_connection_condition (conn -> raw_conn ) : 0 ;
1875
1887
1876
1888
if (!!conn -> error ) {
@@ -1931,7 +1943,7 @@ static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listene
1931
1943
1932
1944
sys_mutex_init (& conn -> activation_lock );
1933
1945
sys_atomic_init (& conn -> core_activation , 0 );
1934
- sys_atomic_init (& conn -> raw_opened , 1 );
1946
+ sys_atomic_init (& conn -> raw_opened , 0 );
1935
1947
1936
1948
conn -> listener_side = true;
1937
1949
conn -> state = LSIDE_INITIAL ;
0 commit comments