Skip to content

Commit

Permalink
Patch refactor:
Browse files Browse the repository at this point in the history
Based on what I've learned doing POC work in the router core and PR
request feedback.  The tls ordinal is leveraged as a group attribute
for the core. This will allow the core to identify which connection in
the group takes precedence. Also removed extra copies of the
correlator string and various code cleanups.
  • Loading branch information
kgiusti committed Feb 28, 2025
1 parent 8a6f770 commit 57a5ddc
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 109 deletions.
2 changes: 1 addition & 1 deletion include/qpid/dispatch/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ extern const char * const QD_CONNECTION_PROPERTY_VERSION_KEY;
extern const char * const QD_CONNECTION_PROPERTY_COST_KEY;
extern const char * const QD_CONNECTION_PROPERTY_ROLE_KEY;
extern const char * const QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY;
extern const char * const QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY;
extern const char * const QD_CONNECTION_PROPERTY_CONN_ID;
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY;
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY;
Expand All @@ -165,7 +166,6 @@ extern const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY;
extern const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE;
extern const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY;
extern const char * const QD_CONNECTION_PROPERTY_ACCESS_ID;
extern const char * const QD_CONNECTION_PROPERTY_TLS_ORDINAL;
/// @}

/** @name Terminus Addresses */
Expand Down
3 changes: 1 addition & 2 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,13 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
const char *user,
const char *container,
pn_data_t *connection_properties,
uint64_t tls_ordinal,
int ssl_ssf,
bool ssl,
const char *version,
bool streaming_links,
bool connection_trunking);

void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator);
void qdr_connection_info_set_group(qdr_connection_info_t *info, const char *correlator, uint64_t ordinal);
void qdr_connection_info_set_tls(qdr_connection_info_t *info, bool enabled, char *version, char *ciphers, int ssf);

void qd_adaptor_listener_init(void);
Expand Down
110 changes: 68 additions & 42 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,53 +1376,39 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
char rversion[128];
uint64_t connection_id = qd_connection_connection_id(conn);
pn_connection_t *pn_conn = qd_connection_pn(conn);
pn_transport_t *tport = 0;
pn_sasl_t *sasl = 0;
const char *mech = 0;
const char *user = 0;
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
const char *host = 0;
uint64_t group_ordinal = 0;
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
char group_correlator[QD_DISCRIMINATOR_SIZE];
char host_local[255];

rversion[0] = 0;
group_correlator[0] = 0;
host_local[0] = 0;

rversion[0] = 0;
conn->strip_annotations_in = false;
conn->strip_annotations_out = false;
if (conn->pn_conn) {
tport = pn_connection_transport(conn->pn_conn);
}
if (tport) {
sasl = pn_sasl(tport);
if(conn->user_id)
user = conn->user_id;
else
user = pn_transport_get_user(tport);
}

if (sasl)
mech = pn_sasl_get_mech(sasl);
qd_router_connection_get_config(conn, &role, &cost, &name,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);

const char *host = 0;
char host_local[255];
const qd_server_config_t *config;
qd_connector_t *connector = qd_connection_connector(conn);

if (connector) {
config = qd_connector_get_config(connector);
const qd_server_config_t *config = qd_connector_get_server_config(connector);
snprintf(host_local, 254, "%s", config->host_port);
host = &host_local[0];
}
else
host = qd_connection_name(conn);


qd_router_connection_get_config(conn, &role, &cost, &name,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);

if (connector && !!connector->ctor_config->data_connection_count) {
memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
// value has the most up-to-date security credentials and should take precedence over connections with a lower
// ordinal value.
(void) qd_connector_get_tls_ordinal(connector, &group_ordinal);
memcpy(group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
if (connector->is_data_connector) {
// override the configured role to identify this as a data connection
assert(role == QDR_ROLE_INTER_ROUTER);
role = QDR_ROLE_INTER_ROUTER_DATA;
}
} else {
host = qd_connection_name(conn);
}

// check offered capabilities for streaming link support and connection trunking support
Expand Down Expand Up @@ -1457,10 +1443,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
pn_data_rewind(props);
if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
const size_t num_items = pn_data_get_map(props);
int props_found = 0; // once all props found exit loop

const size_t num_items = pn_data_get_map(props);
const int max_props = 8; // total possible props
int props_found = 0; // once all props found exit loop

pn_data_enter(props);
for (int i = 0; i < num_items / 2 && props_found < 7; ++i) {
for (int i = 0; i < num_items / 2 && props_found < max_props; ++i) {
if (!pn_data_next(props)) break;
if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map
pn_bytes_t key = pn_data_get_symbol(props);
Expand Down Expand Up @@ -1493,11 +1482,26 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY, key.size) == 0) {
props_found += 1;
assert(!connector); // expect: connector sets correlator, listener consumes it
if (!pn_data_next(props)) break;
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
if (pn_data_type(props) == PN_STRING) {
// pn_bytes is not null terminated
pn_bytes_t gc = pn_data_get_string(props);
strncpy(conn->group_correlator, gc.start, MIN(gc.size, QD_DISCRIMINATOR_SIZE));
size_t len = MIN(gc.size, QD_DISCRIMINATOR_BYTES);
memcpy(group_correlator, gc.start, len);
group_correlator[len] = '\0';
}
}

} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY, key.size) == 0) {
props_found += 1;
assert(!connector); // expect: connector sets ordinal, listener consumes it
if (!pn_data_next(props)) break;
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
if (pn_data_type(props) == PN_ULONG) {
group_ordinal = pn_data_get_ulong(props);
}
}

Expand Down Expand Up @@ -1530,6 +1534,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool

} else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID)
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
props_found += 1;
if (!pn_data_next(props)) break;
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
Expand All @@ -1539,13 +1544,35 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
// skip this key
if (!pn_data_next(props)) break;
}

// NOTE: if adding more keys update max_props value above!
}
}
}

char *proto = 0;
char *cipher = 0;
int ssl_ssf = 0;
// Gather transport-level information

pn_transport_t *tport = 0;
pn_sasl_t *sasl = 0;
const char *mech = 0;
const char *user = 0;
char *proto = 0;
char *cipher = 0;
int ssl_ssf = 0;

if (conn->pn_conn) {
tport = pn_connection_transport(conn->pn_conn);
}
if (tport) {
sasl = pn_sasl(tport);
if(conn->user_id)
user = conn->user_id;
else
user = pn_transport_get_user(tport);
}

if (sasl)
mech = pn_sasl_get_mech(sasl);

if (conn->ssl) {
proto = qd_tls_session_get_protocol_version(conn->ssl);
Expand All @@ -1567,14 +1594,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
(char*) user,
container,
props,
qd_tls_session_get_profile_ordinal(conn->ssl),
ssl_ssf,
!!conn->ssl,
rversion,
streaming_links,
connection_trunking);

qdr_connection_info_set_group_correlator(connection_info, conn->group_correlator);
qdr_connection_info_set_group(connection_info, group_correlator, group_ordinal);

qdr_connection_opened(router->router_core,
amqp_adaptor.adaptor,
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en
qd_listener_decref(li);
return 0;
}
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
li->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(li->tls_config);
}

Expand Down
28 changes: 11 additions & 17 deletions src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ static void decorate_connection(qd_connection_t *ctx)

pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_CONN_ID), QD_CONNECTION_PROPERTY_CONN_ID));
qd_connection_t *qd_conn = pn_connection_get_context(conn);
pn_data_put_int(pn_connection_properties(conn), qd_conn->connection_id);
pn_data_put_int(pn_connection_properties(conn), ctx->connection_id);

if (config && config->inter_router_cost > 1) {
pn_data_put_symbol(pn_connection_properties(conn),
Expand All @@ -146,16 +145,22 @@ static void decorate_connection(qd_connection_t *ctx)
pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA);
}

if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) {
// The connector-side assigns the group correlator and ordinal values and passes to the listener side
//
if (ctx->connector && !!ctx->connector->ctor_config->group_correlator[0]) {
uint64_t tls_ordinal;
const qd_connector_config_t *ctor_config = ctx->connector->ctor_config;
pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY));
pn_data_put_string(pn_connection_properties(conn),
pn_bytes(strnlen(ctx->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctx->group_correlator));
pn_bytes(strnlen(ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctor_config->group_correlator));

if (qd_connection_get_tls_ordinal(qd_conn, &tls_ordinal)) {
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
// value has the most up-to-date security credentials and should take precedence over connections with a lower
// ordinal value.
if (qd_connector_get_tls_ordinal(ctx->connector, &tls_ordinal)) {
pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_TLS_ORDINAL), QD_CONNECTION_PROPERTY_TLS_ORDINAL));
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY), QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY));
pn_data_put_ulong(pn_connection_properties(conn), tls_ordinal);
}
}
Expand Down Expand Up @@ -852,14 +857,3 @@ void qd_amqp_connection_set_tracing(bool enable_tracing)
sys_mutex_unlock(&amqp_adaptor.lock);
}
}


bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal)
{
if (qd_conn->ssl) {
*tls_ordinal = qd_tls_session_get_profile_ordinal(qd_conn->ssl);
return true;
}
*tls_ordinal = 0;
return false;
}
10 changes: 0 additions & 10 deletions src/adaptors/amqp/qd_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ struct qd_connection_t {
sys_mutex_t deferred_call_lock;
bool policy_counted;
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
char group_correlator[QD_DISCRIMINATOR_SIZE];
qd_pn_free_link_list_t free_link_list;
bool strip_annotations_in;
bool strip_annotations_out;
Expand Down Expand Up @@ -297,13 +296,4 @@ void qd_connection_transport_tracer(pn_transport_t *transport, const char *messa

bool qd_connection_handle_event(qd_server_t *qd_server, pn_event_t *e, void *context);
bool qd_connection_strip_annotations_in(const qd_connection_t *c);

/**
* Get the value of the TLS ordinal that is in use by this connection.
*
* @return True if the TLS ordinal is configured and tls_ordinal has been set, false if the connection has no TLS
* ordinal.
*/
bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal);

#endif
33 changes: 21 additions & 12 deletions src/adaptors/amqp/qd_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ static void deferred_close(void *context, bool discard)
}


const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c)
const qd_server_config_t *qd_connector_get_server_config(const qd_connector_t *c)
{
return &c->ctor_config->config;
}
Expand All @@ -157,11 +157,11 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
connector->reconnect_enabled = true;
connector->is_data_connector = is_data_connector;

connector->ctor_config = ctor_config;
sys_atomic_inc(&ctor_config->ref_count);

connector->conn_index = 1;
connector->state = CTOR_STATE_INIT;
connector->ctor_config = ctor_config;
connector->conn_index = 1;
connector->state = CTOR_STATE_INIT;
connector->tls_ordinal = ctor_config->tls_ordinal;

qd_failover_item_t *item = NEW(qd_failover_item_t);
ZERO(item);
Expand Down Expand Up @@ -379,8 +379,6 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx
sys_atomic_inc(&connector->ref_count);
ctx->connector = connector;
connector->qd_conn = ctx;

strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
}


Expand Down Expand Up @@ -502,6 +500,8 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
return 0;
}

const bool is_inter_router = strcmp(ctor_config->config.role, "inter-router") == 0;

//
// If an sslProfile is configured allocate a TLS config to be used by all child connector's connections
//
Expand All @@ -515,21 +515,20 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
// qd_tls2_config() has set the qd_error_message(), which is logged below
goto error;
}
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
ctor_config->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(ctor_config->tls_config);
if (strcmp(ctor_config->config.role, "inter-router") == 0) {
if (is_inter_router) {
qd_tls_config_register_update_callback(ctor_config->tls_config, ctor_config,
handle_connector_ssl_profile_mgmt_update);
}
}

// For inter-router connectors create associated inter-router data connectors if configured

if (strcmp(ctor_config->config.role, "inter-router") == 0) {
if (is_inter_router) {
qd_generate_discriminator(ctor_config->group_correlator);
ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd);
if (!!ctor_config->data_connection_count) {
qd_generate_discriminator(ctor_config->group_correlator);

// Add any data connectors to the head of the connectors list first. This allows the
// router control connector to be located at the head of the list.

Expand Down Expand Up @@ -621,3 +620,13 @@ void qd_connector_config_connect(qd_connector_config_t *ctor_config)
}
}


bool qd_connector_get_tls_ordinal(const qd_connector_t *ctor, uint64_t *tls_ordinal)
{
if (!!ctor->ctor_config->tls_config) {
*tls_ordinal = ctor->tls_ordinal;
return true;
}
*tls_ordinal = 0;
return false;
}
Loading

0 comments on commit 57a5ddc

Please sign in to comment.