Skip to content

Commit

Permalink
Fixes skupperproject#1741: add ordinal attributes to sslProfile
Browse files Browse the repository at this point in the history
Include mechanism for propagating updates to affected
connectors. Advertise the connection's ordinal to the peer router via
the properties map in the Open performative.
  • Loading branch information
kgiusti committed Feb 11, 2025
1 parent ec68f0e commit 6b6019b
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 69 deletions.
1 change: 1 addition & 0 deletions include/qpid/dispatch/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ extern const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY;
extern const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE;
extern const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY;
extern const char * const QD_CONNECTION_PROPERTY_ACCESS_ID;
extern const char * const QD_CONNECTION_PROPERTY_TLS_ORDINAL;
/// @}

/** @name Terminus Addresses */
Expand Down
1 change: 1 addition & 0 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
const char *user,
const char *container,
pn_data_t *connection_properties,
uint64_t tls_ordinal,
int ssl_ssf,
bool ssl,
const char *version,
Expand Down
58 changes: 53 additions & 5 deletions include/qpid/dispatch/tls_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,17 @@ struct qd_ssl2_profile_t {
char *uid_name_mapping_file;

/**
* version: Version assigned to the current configuration
* oldest_valid_version: Previous sslProfile updates with versions values < oldest_valid_version have expired.
* Certificate Rotation
* ordinal: Identifies the current configuration's revision
* oldest_valid_ordinal: Previous configurations with ordinal values < oldest_valid_ordinal have expired.
*/
long version;
long oldest_valid_version;
uint64_t ordinal;
uint64_t oldest_valid_ordinal;
};

/**
* Create a new TLS qd_tls_config_t instance with the given configuration
* Create a new TLS qd_tls_config_t instance with the given configuration. This is called when a listener/connector
* record is created.
*
* @param ssl_profile_name the name of the sslProfile configuration to use
* @param p_type protocol type for the child connections (TCP or AMQP)
Expand All @@ -117,6 +119,45 @@ qd_tls_config_t *qd_tls_config(const char *ssl_profile_name,
void qd_tls_config_decref(qd_tls_config_t *config);


/**
* Get the values of the ordinal/oldestValidOrdinal assocated with the TLS configuration.
*
* Note: To avoid races this function can only be called from the context of the management thread.
*
* @param config The qd_tls_config_t to query.
* @return the value for the ordinal/lastValidOrdinal sslProfile attributes used by this config
*/
uint64_t qd_tls_config_get_ordinal(const qd_tls_config_t *config);
uint64_t qd_tls_config_get_oldest_valid_ordinal(const qd_tls_config_t *config);


/** Register a callback to monitor updates to the TLS configuration
*
* Register a callback function that will be invoked by the management thread whenever the sslProfile record associated
* with the qd_tls_config_t is updated. Note that the update callback is invoked on the management thread while the
* qd_tls_config is locked. This prevents new TLS sessions from being created using the updated configuration until
* after the update callback returns.
*
* @param update_cb_context Opaque handle passed to update callback function.
* @param update_cb Optional callback when the sslProfile has been updated by management.
*/
typedef void (*qd_tls_config_update_cb_t)(const qd_tls_config_t *config,
void *update_cb_context);
void qd_tls_config_register_update_callback(qd_tls_config_t *config, void *update_cb_context,
qd_tls_config_update_cb_t update_cb);


/**
* Cancel the update callback.
*
* Deregisters the update callback provided in qd_tls_config(). No further calls to the callback will occur on return
* from this call. Can only be called from the context of the management thread.
*
* @param config The qd_tls_config_t whose callback will be cancelled
*/
void qd_tls_config_cancel_update_callback(qd_tls_config_t *config);


/**
* Release a TLS session context.
*
Expand Down Expand Up @@ -153,6 +194,13 @@ char *qd_tls_session_get_protocol_ciphers(const qd_tls_session_t *session);
*/
int qd_tls_session_get_ssf(const qd_tls_session_t *session);

/**
* Get the ordinal of the sslProfile used to create this session
*
* @param session to be queried
* @return the value of the sslProfile ordinal associated with this session.
*/
uint64_t qd_tls_session_get_profile_ordinal(const qd_tls_session_t *session);

/**
* Fill out the given *profile with the configuration from the named sslProfile record.
Expand Down
8 changes: 4 additions & 4 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -725,15 +725,15 @@
"description": "The absolute path to the file containing the unique id to display name mapping",
"create": true
},
"version": {
"ordinal": {
"type": "integer",
"description": "RESERVED FOR FUTURE USE",
"description": "Indicates the revision of the TLS certificates provided. Used by certificate rotation. Each time the TLS certificates associated with this profile are updated the ordinal value will be increased. The most recent version of the TLS credentials will have the highest numerical ordinal value.",
"create": true,
"update": true
},
"oldestValidVersion": {
"oldestValidOrdinal": {
"type": "integer",
"description": "RESERVED FOR FUTURE USE",
"description": "Used by certificate rotation to remove connections that were created using TLS certificates that are no longer considered valid. Any active connections based on TLS certificates an ordinal value less than oldesValidOrdinal will be immediately terminated by the router.",
"create": true,
"update": true
}
Expand Down
1 change: 1 addition & 0 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
(char*) user,
container,
props,
qd_tls_session_get_profile_ordinal(conn->ssl),
ssl_ssf,
!!conn->ssl,
rversion,
Expand Down
6 changes: 4 additions & 2 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en
qd_listener_decref(li);
return 0;
}
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
li->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(li->tls_config);
}

char *fol = qd_entity_opt_string(entity, "failoverUrls", 0);
Expand Down Expand Up @@ -285,8 +287,8 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl

QD_EXPORT qd_connector_config_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity);
qd_connection_manager_t *cm = qd->connection_manager;
qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity);
if (!ctor_config) {
return 0;
}
Expand Down
75 changes: 45 additions & 30 deletions src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,14 @@ static void connection_wake(qd_connection_t *ctx)
}


static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t *config)
/** Setup connection capabilities and properties.
* These are communicated to the peer via the Open performative.
*/
static void decorate_connection(qd_connection_t *ctx)
{
qd_server_t *qd_server = ctx->server;
pn_connection_t *conn = ctx->pn_conn;
//
// Set the container name
//
pn_connection_set_container(conn, qd_server_get_container_name(qd_server));
pn_connection_t *conn = ctx->pn_conn;
const qd_server_config_t *config = qd_connection_config(ctx);

//
// Advertise our container capabilities.
//
{
// offered: extension capabilities this router supports
pn_data_t *ocaps = pn_connection_offered_capabilities(conn);
Expand Down Expand Up @@ -151,10 +147,17 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t *
}

if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) {
uint64_t tls_ordinal;
pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY));
pn_data_put_string(pn_connection_properties(conn),
pn_bytes(strnlen(ctx->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctx->group_correlator));

if (qd_connection_get_tls_ordinal(qd_conn, &tls_ordinal)) {
pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_TLS_ORDINAL), QD_CONNECTION_PROPERTY_TLS_ORDINAL));
pn_data_put_ulong(pn_connection_properties(conn), tls_ordinal);
}
}

if (ctx->listener && !!ctx->listener->vflow_record) {
Expand Down Expand Up @@ -246,6 +249,8 @@ void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, const qd_serv
{
ctx->pn_conn = pn_connection();
assert(ctx->pn_conn);

pn_connection_set_container(ctx->pn_conn, qd_server_get_container_name(server));
sys_mutex_init(&ctx->deferred_call_lock);
ctx->role = qd_strdup(config->role);
ctx->server = server;
Expand All @@ -262,18 +267,13 @@ void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, const qd_serv
DEQ_INIT(ctx->free_link_list);
DEQ_INIT(ctx->child_sessions);

// note: setup connector or listener before decorating the connection since
// decoration involves accessing the connection's parent.

if (!!connector) {
assert(!listener);
qd_connector_add_connection(connector, ctx);
} else if (!!listener) {
qd_listener_add_connection(listener, ctx);
}

decorate_connection(ctx, config);

sys_mutex_lock(&amqp_adaptor.lock);
DEQ_INSERT_TAIL(amqp_adaptor.conn_list, ctx);
sys_mutex_unlock(&amqp_adaptor.lock);
Expand Down Expand Up @@ -619,13 +619,15 @@ static bool setup_ssl_sasl_and_open(qd_connection_t *ctx)
pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);

decorate_connection(ctx);
pn_connection_open(ctx->pn_conn);
return true;
}


/* Configure the transport once it is bound to the connection */
static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
static void on_connection_bound(qd_server_t *server, pn_event_t *e)
{
pn_connection_t *pn_conn = pn_event_connection(e);
qd_connection_t *ctx = pn_connection_get_context(pn_conn);
pn_transport_t *tport = pn_connection_transport(pn_conn);
Expand All @@ -643,9 +645,21 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
pn_transport_set_tracer(tport, transport_tracer);
}

const qd_server_config_t *config = NULL;
const qd_server_config_t *config = qd_connection_config(ctx);
assert(config);

//
// Common transport configuration.
//
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
// pn_transport_set_channel_max sets the maximum session *identifier*, not the total number of sessions. Thus Proton
// will allow sessions with identifiers [0..max_sessions], which is one greater than the value we pass to
// pn_transport_set_channel_max. So to limit the maximum number of simultaineous sessions to config->max_sessions we
// have to decrement it by one for Proton.
pn_transport_set_channel_max(tport, config->max_sessions - 1);

if (ctx->listener) { /* Accepting an incoming connection */
config = &ctx->listener->config;
const char *name = config->host_port;
pn_transport_set_server(tport);
set_rhost_port(ctx);
Expand Down Expand Up @@ -676,13 +690,14 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
pn_transport_require_auth(tport, config->requireAuthentication);
pn_transport_require_encryption(tport, config->requireEncryption);
pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
decorate_connection(ctx);

// This log statement is kept at INFO level because this shows the inter-router
// connections and that is useful when debugging router issues.
qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s",
ctx->connection_id, name, ctx->rhost_port);

} else if (ctx->connector) { /* Establishing an outgoing connection */
config = &ctx->connector->ctor_config->config;
if (!setup_ssl_sasl_and_open(ctx)) {
qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error",
ctx->connection_id);
Expand All @@ -695,17 +710,6 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "unknown Connection");
return;
}

//
// Common transport configuration.
//
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
// pn_transport_set_channel_max sets the maximum session *identifier*, not the total number of sessions. Thus Proton
// will allow sessions with identifiers [0..max_sessions], which is one greater than the value we pass to
// pn_transport_set_channel_max. So to limit the maximum number of simultaineous sessions to config->max_sessions we
// have to decrement it by one for Proton.
pn_transport_set_channel_max(tport, config->max_sessions - 1);
}

void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_connection_t *pn_conn, qd_connection_t *qd_conn);
Expand Down Expand Up @@ -848,3 +852,14 @@ void qd_amqp_connection_set_tracing(bool enable_tracing)
sys_mutex_unlock(&amqp_adaptor.lock);
}
}


bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal)
{
if (qd_conn->ssl) {
*tls_ordinal = qd_tls_session_get_profile_ordinal(qd_conn->ssl);
return true;
}
*tls_ordinal = 0;
return false;
}
9 changes: 9 additions & 0 deletions src/adaptors/amqp/qd_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,13 @@ void qd_connection_transport_tracer(pn_transport_t *transport, const char *messa

bool qd_connection_handle_event(qd_server_t *qd_server, pn_event_t *e, void *context);
bool qd_connection_strip_annotations_in(const qd_connection_t *c);

/**
* Get the value of the TLS ordinal that is in use by this connection.
*
* @return True if the TLS ordinal is configured and tls_ordinal has been set, false if the connection has no TLS
* ordinal.
*/
bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal);

#endif
Loading

0 comments on commit 6b6019b

Please sign in to comment.