Skip to content

Commit

Permalink
Fixes #1743: implement TLS ordinal connection management
Browse files Browse the repository at this point in the history
Based on what I've learned doing POC work in the router core and PR
request feedback.  The tls ordinal is leveraged as a group attribute
for the core. This will allow the core to identify which connection in
the group takes precedence. Also removed extra copies of the
correlator string and various code cleanups.
  • Loading branch information
kgiusti committed Mar 10, 2025
1 parent f7160bf commit f026e00
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 93 deletions.
12 changes: 12 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,18 @@
"type": "integer",
"graph": true,
"description": "The number of seconds since a delivery was sent on this connection. Will display a - (dash) if no deliveries have been sent on the connection."
},
"tlsOrdinal": {
"description": "If the ssl attribute is true this attribute shows the value of the TLS ordinal in use by the connection. This value reflects the value of the ordinal attribute from the sslProfile that was used when the TLS session was created.",
"type": "integer"
},
"groupCorrelationId": {
"description": "The identifier used to group inter-router and inter-router-data connections generated by the same connector.",
"type": "string"
},
"groupOrdinal": {
"description": "Indicates the precedence of the connection within the group. Connections with the highest ordinal value within the group will be used to forward new message streams.",
"type": "integer"
}
}
},
Expand Down
10 changes: 2 additions & 8 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,8 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
// TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection
// count configuration. However we can only report 1 connector via management. It would be more accurate to report
// all connectors associated with this management entity
sys_mutex_lock(&ctor_config->lock);
assert(sys_thread_role(0) == SYS_THREAD_MAIN || sys_thread_proactor_mode() == SYS_THREAD_PROACTOR_MODE_TIMER); // only mgmt thread can access connectors list
connector = DEQ_HEAD(ctor_config->connectors);
if (connector) {
// prevent I/O thread from freeing connector while it is being accessed
sys_atomic_inc(&connector->ref_count);
}
sys_mutex_unlock(&ctor_config->lock);

if (connector) {
int i = 1;
Expand Down Expand Up @@ -275,7 +270,6 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
}

sys_mutex_unlock(&connector->lock);
qd_connector_decref(connector); // release local reference
free(failover_info);
} else {
qd_error(QD_ERROR_NOT_FOUND, "No active connector present");
Expand Down Expand Up @@ -369,7 +363,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd)
}

while (ctor_config) {
qd_connector_config_connect(ctor_config);
qd_connector_config_activate(ctor_config);
ctor_config = DEQ_NEXT(ctor_config);
}

Expand Down
225 changes: 177 additions & 48 deletions src/adaptors/amqp/qd_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,59 @@ ALLOC_DEFINE(qd_connector_config_t);
#define ASSERT_MGMT_THREAD assert(sys_thread_role(0) == SYS_THREAD_MAIN || sys_thread_proactor_mode() == SYS_THREAD_PROACTOR_MODE_TIMER)


/** Disable reconnect
*
* This disables the connectors ability to reconnect a connection after it has failed.
*
* @param ctor the connector to disable
* @param active set to true if the connector has an active connection, false if there is no associated connection.
*/
static void qd_connector_disable_reconnect(qd_connector_t *ctor, bool *active)
{
qd_timer_t *timer = 0;

sys_mutex_lock(&ctor->lock);
timer = ctor->reconnect_timer;
ctor->reconnect_timer = 0;
*active = !!ctor->qd_conn;
sys_mutex_unlock(&ctor->lock);

// Cannot free the timer while holding the lock because the timer handler will take the lock
qd_timer_free(timer);
}


/**
* Scan the connector_config connectors and delete any that are no longer active
*/
static void qd_connector_config_cleanup_conns(void *context)
{
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list!

qd_connector_config_t *ctor_config = (qd_connector_config_t *) context;
qd_connector_t *ctor = DEQ_HEAD(ctor_config->connectors);

while (ctor) {
bool can_delete;

sys_mutex_lock(&ctor->lock);
// connection is gone and reconnect has been disabled
can_delete = !ctor->reconnect_timer && !ctor->qd_conn;
sys_mutex_unlock(&ctor->lock);

if (can_delete) {
qd_connector_t *delete_me = ctor;
ctor = DEQ_NEXT(ctor);
DEQ_REMOVE(ctor_config->connectors, delete_me);
qd_connector_close(delete_me);
qd_connector_decref(delete_me);
} else {
ctor = DEQ_NEXT(ctor);
}
}
}


static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_REQ(ct->lock)
{
qd_failover_item_t *item = DEQ_HEAD(ct->conn_info_list);
Expand Down Expand Up @@ -102,25 +155,30 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_
//
static void try_open_cb(void *context)
{
qd_connector_t *ct = (qd_connector_t*) context;
qd_connector_t *ctor = (qd_connector_t*) context;

// Allocate connection before taking connector lock to avoid
// CONNECTOR - ENTITY_CACHE lock inversion deadlock window.
qd_connection_t *ctx = new_qd_connection_t();
ZERO(ctx);

sys_mutex_lock(&ct->lock);

if (ct->state == CTOR_STATE_CONNECTING || ct->state == CTOR_STATE_INIT) {
// else deleted or failed - on failed wait until after connection is freed
// and state is set to CTOR_STATE_CONNECTING (timer is rescheduled then)
try_open_lh(ct, ctx);
ctx = 0; // owned by ct
qd_connection_t *qd_conn = new_qd_connection_t();
ZERO(qd_conn);

sys_mutex_lock(&ctor->lock);

if (ctor->reconnect_timer) { // reconnect has not been cancelled
switch (ctor->state) {
case CTOR_STATE_CONNECTING:
case CTOR_STATE_INIT:
try_open_lh(ctor, qd_conn);
qd_conn = 0; // ownership moved to ctor->qd_conn
break;
default:
break;
}
}

sys_mutex_unlock(&ct->lock);
sys_mutex_unlock(&ctor->lock);

free_qd_connection_t(ctx); // noop if ctx == 0
free_qd_connection_t(qd_conn); // noop if qd_conn == 0
}


Expand Down Expand Up @@ -153,8 +211,7 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
DEQ_ITEM_INIT(connector);

sys_mutex_init(&connector->lock);
connector->timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector);
connector->reconnect_enabled = true;
connector->reconnect_timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector);
connector->is_data_connector = is_data_connector;

sys_atomic_inc(&ctor_config->ref_count);
Expand Down Expand Up @@ -205,21 +262,16 @@ const char *qd_connector_policy_vhost(const qd_connector_t* ct)
}


bool qd_connector_connect(qd_connector_t *ct)
void qd_connector_activate(qd_connector_t *ct)
{
sys_mutex_lock(&ct->lock);
if (ct->state != CTOR_STATE_DELETED) {
// expect: do not attempt to connect an already connected qd_connection
assert(ct->qd_conn == 0);
ct->qd_conn = 0;
ct->delay = 0;
ct->state = CTOR_STATE_CONNECTING;
qd_timer_schedule(ct->timer, ct->delay);
sys_mutex_unlock(&ct->lock);
return true;
}
// expect: do not attempt to connect an already connected qd_connection
assert(ct->state == CTOR_STATE_INIT);
ct->qd_conn = 0;
ct->delay = 0;
ct->state = CTOR_STATE_CONNECTING;
qd_timer_schedule(ct->reconnect_timer, ct->delay);
sys_mutex_unlock(&ct->lock);
return false;
}


Expand All @@ -234,8 +286,8 @@ void qd_connector_close(qd_connector_t *ct)
void *dct = qd_connection_new_qd_deferred_call_t();

sys_mutex_lock(&ct->lock);
timer = ct->timer;
ct->timer = 0;
timer = ct->reconnect_timer;
ct->reconnect_timer = 0;
ct->state = CTOR_STATE_DELETED;
qd_connection_t *conn = ct->qd_conn;
if (conn && conn->pn_conn) {
Expand All @@ -261,7 +313,7 @@ void qd_connector_decref(qd_connector_t* connector)
qd_connector_config_decref(connector->ctor_config);
vflow_end_record(connector->vflow_record);
connector->vflow_record = 0;
qd_timer_free(connector->timer);
qd_timer_free(connector->reconnect_timer);
sys_mutex_free(&connector->lock);
sys_atomic_destroy(&connector->ref_count);

Expand Down Expand Up @@ -412,7 +464,9 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const
connector->qd_conn = 0;
ctx->connector = 0;

if (connector->state != CTOR_STATE_DELETED) {
// Should we reconnect?
if (!!connector->reconnect_timer) {
assert(connector->state != CTOR_STATE_DELETED);
// Increment the connection index by so that we can try connecting to the failover url (if any).
bool has_failover = qd_connector_has_failover_info(connector);
long delay = connector->delay;
Expand All @@ -424,7 +478,14 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const
delay = 1000;
}
connector->state = CTOR_STATE_CONNECTING;
qd_timer_schedule(connector->timer, delay);
qd_timer_schedule(connector->reconnect_timer, delay);

} else {
// Reconnect has been disabled and connection has dropped: this connector needs to be deleted. That can only be
// done via the connector config while running on the management thread (timer). By delaying 10msec we avoid
// thrashing the timer should several connections fail at once (think remote router drop).
assert(connector->ctor_config);
qd_timer_schedule(connector->ctor_config->cleanup_timer, 10);
}
sys_mutex_unlock(&connector->lock);

Expand All @@ -439,27 +500,90 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const
//
static void handle_connector_ssl_profile_mgmt_update(const qd_tls_config_t *config, void *context)
{
ASSERT_MGMT_THREAD;
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list

uint64_t new_ordinal = qd_tls_config_get_ordinal(config);
uint64_t new_oldest_ordinal = qd_tls_config_get_oldest_valid_ordinal(config);
qd_connector_config_t *ctor_config = (qd_connector_config_t *) context;

if (new_ordinal > ctor_config->tls_ordinal) {
// TBD
qd_log(LOG_SERVER, QD_LOG_DEBUG,
"Connector %s new ordinal: %"PRIu64", previous: %"PRIu64,
ctor_config->config.name, new_ordinal, ctor_config->tls_ordinal);
ctor_config->tls_ordinal = new_ordinal;
}
// Expect: this callback is only registered for inter-router connectors (for now)
assert(strcmp(ctor_config->config.role, "inter-router") == 0);

// destroy all connections whose connectors use an expired TLS ordinal:

if (new_oldest_ordinal > ctor_config->tls_oldest_valid_ordinal) {
// TBD

qd_log(LOG_SERVER, QD_LOG_DEBUG,
"Connector %s new oldest valid ordinal: %"PRIu64", previous: %"PRIu64,
"Connector %s new oldest valid TLS ordinal: %"PRIu64", previous: %"PRIu64,
ctor_config->config.name, new_oldest_ordinal, ctor_config->tls_oldest_valid_ordinal);

// Spin through all connectors and delete those with an expired ordinal

qd_connector_t *ctor = DEQ_HEAD(ctor_config->connectors);
while (ctor) {
if (ctor->tls_ordinal < new_oldest_ordinal) {
qd_connector_t *delete_me = ctor;
ctor = DEQ_NEXT(ctor);
DEQ_REMOVE(ctor_config->connectors, delete_me);
qd_connector_close(delete_me);
qd_connector_decref(delete_me);
} else {
ctor = DEQ_NEXT(ctor);
}
}

ctor_config->tls_oldest_valid_ordinal = new_oldest_ordinal;
}

if (new_ordinal > ctor_config->tls_ordinal) {

qd_log(LOG_SERVER, QD_LOG_DEBUG,
"Connector %s new TLS ordinal: %"PRIu64", previous: %"PRIu64,
ctor_config->config.name, new_ordinal, ctor_config->tls_ordinal);

// Prevent existing connections that use older TLS ordinals from reconnecting. The goal is to allow older (but
// still valid) TLS sessions to pass existing flows but avoid sending new flows over them.

qd_connector_t *ctor = DEQ_HEAD(ctor_config->connectors);
while (ctor) {
bool active;
qd_connector_disable_reconnect(ctor, &active);
if (!active) {
// If the connector does not have an active connection it can be deleted
qd_connector_t *delete_me = ctor;
ctor = DEQ_NEXT(ctor);
DEQ_REMOVE(ctor_config->connectors, delete_me);
qd_connector_close(delete_me);
qd_connector_decref(delete_me);
} else {
ctor = DEQ_NEXT(ctor);
}
}

// Create a new set of connectors that will use the updated TLS credentials

ctor_config->tls_ordinal = new_ordinal;

for (int i = 0; i < ctor_config->data_connection_count; i++) {
ctor = qd_connector_create(ctor_config, true);
if (!ctor) {
// TODO: this should not happen in real life but if it does it cannot be handled gracefully. Complain
// loudly.
qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name);
} else {
DEQ_INSERT_HEAD(ctor_config->connectors, ctor);
qd_connector_activate(ctor);
}
}

ctor = qd_connector_create(ctor_config, false);
if (!ctor) {
qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name);
} else {
DEQ_INSERT_HEAD(ctor_config->connectors, ctor);
qd_connector_activate(ctor);
}
}
}


Expand All @@ -468,7 +592,7 @@ static void handle_connector_ssl_profile_mgmt_update(const qd_tls_config_t *conf
*/
qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity)
{
ASSERT_MGMT_THREAD;
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list!

qd_connector_config_t *ctor_config = new_qd_connector_config_t();
if (!ctor_config) {
Expand All @@ -483,9 +607,9 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
ZERO(ctor_config);
DEQ_ITEM_INIT(ctor_config);
sys_atomic_init(&ctor_config->ref_count, 1); // for caller
sys_mutex_init(&ctor_config->lock);
ctor_config->server = qd_dispatch_get_server(qd);
DEQ_INIT(ctor_config->connectors);
ctor_config->cleanup_timer = qd_timer(amqp_adaptor.dispatch, qd_connector_config_cleanup_conns, ctor_config);

if (qd_server_config_load(&ctor_config->config, entity, false) != QD_ERROR_NONE) {
qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message());
Expand Down Expand Up @@ -570,7 +694,7 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t

void qd_connector_config_delete(qd_connector_config_t *ctor_config)
{
ASSERT_MGMT_THREAD;
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list!

qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors);
while (ct) {
Expand Down Expand Up @@ -599,7 +723,10 @@ void qd_connector_config_decref(qd_connector_config_t *ctor_config)
if (rc == 1) {
// Expect: all connectors hold the ref_count so this must be empty
assert(DEQ_IS_EMPTY(ctor_config->connectors));
sys_mutex_free(&ctor_config->lock);

// free the timer first otherwise the callback can run and attempt to access ctor_config while it is being torn
// down:
qd_timer_free(ctor_config->cleanup_timer);
sys_atomic_destroy(&ctor_config->ref_count);
free(ctor_config->policy_vhost);
qd_tls_config_decref(ctor_config->tls_config);
Expand All @@ -610,12 +737,14 @@ void qd_connector_config_decref(qd_connector_config_t *ctor_config)


// Initiate connections on all child connectors
void qd_connector_config_connect(qd_connector_config_t *ctor_config)
void qd_connector_config_activate(qd_connector_config_t *ctor_config)
{
ASSERT_MGMT_THREAD; // only the mgmt thread can access the connector list!

if (!ctor_config->activated) {
ctor_config->activated = true;
for (qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); !!ct; ct = DEQ_NEXT(ct)) {
qd_connector_connect(ct);
qd_connector_activate(ct);
}
}
}
Expand Down
Loading

0 comments on commit f026e00

Please sign in to comment.