Skip to content

Commit

Permalink
Issue skupperproject#1748: move inter-router links from qdr_core_t to…
Browse files Browse the repository at this point in the history
… qdr_connection_t
  • Loading branch information
kgiusti committed Mar 5, 2025
1 parent c6fc283 commit 79ae9d9
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 49 deletions.
43 changes: 27 additions & 16 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -1081,10 +1081,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
if (qd_bitmask_valid_bit_value(conn->mask_bit)) {
if (link->link_type == QD_LINK_CONTROL)
core->control_links_by_mask_bit[conn->mask_bit] = 0;
conn->control_links[link->link_direction] = 0;
if (link->link_type == QD_LINK_ROUTER) {
if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
if (link == conn->data_links.link[link->priority])
conn->data_links.link[link->priority] = 0;
}
}

Expand Down Expand Up @@ -1815,7 +1815,7 @@ static void qdr_connection_notify_closed_CT(qdr_core_t *core, qdr_action_t *acti
//
if (conn->role == QDR_ROLE_INTER_ROUTER) {
assert(qd_bitmask_valid_bit_value(conn->mask_bit));
qdr_reset_sheaf(core, conn->mask_bit);
qdr_reset_sheaf(conn);
qd_bitmask_set_bit(core->neighbor_free_mask, conn->mask_bit);
core->rnode_conns_by_mask_bit[conn->mask_bit] = 0;
}
Expand Down Expand Up @@ -1886,24 +1886,31 @@ static void qdr_connection_notify_closed_CT(qdr_core_t *core, qdr_action_t *acti


//
// Handle the attachment and detachment of an inter-router control link
// Handle the attachment and detachment of an inter-router control link.
//
static void qdr_attach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER) {
assert(link->link_type == QD_LINK_CONTROL);
assert(conn->role == QDR_ROLE_INTER_ROUTER);

conn->control_links[link->link_direction] = link;
if (link->link_direction == QD_OUTGOING) {
link->owning_addr = core->hello_addr;
qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
core->control_links_by_mask_bit[conn->mask_bit] = link;
}
}


static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER) {
assert(link->link_type == QD_LINK_CONTROL);
assert(conn->role == QDR_ROLE_INTER_ROUTER);
assert(conn->control_links[link->link_direction] == link);

conn->control_links[link->link_direction] = 0;
if (link->link_direction == QD_OUTGOING) {
qdr_del_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
link->owning_addr = 0;
core->control_links_by_mask_bit[conn->mask_bit] = 0;
qdr_post_link_lost_CT(core, conn->mask_bit);
}
}
Expand All @@ -1914,16 +1921,18 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
//
static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
assert(conn->role == QDR_ROLE_INTER_ROUTER);
assert(link->link_type == QD_LINK_ROUTER);

// The first 2 x QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over
// the inter-router connection are the shared priority links. These links
// are attached in priority order starting at zero.
if (link->link_direction == QD_OUTGOING) {
int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count;
int next_pri = conn->data_links.count;
if (next_pri < QDR_N_PRIORITIES) {
link->priority = next_pri;
core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link;
core->data_links_by_mask_bit[conn->mask_bit].count += 1;
conn->data_links.link[next_pri] = link;
conn->data_links.count += 1;
}
} else {
if (conn->next_pri < QDR_N_PRIORITIES) {
Expand All @@ -1935,10 +1944,12 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd

static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
assert(conn->role == QDR_ROLE_INTER_ROUTER);
assert(link->link_type == QD_LINK_ROUTER);

// if this link is in the priority sheaf it needs to be removed
if (conn->role == QDR_ROLE_INTER_ROUTER)
if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
if (link == conn->data_links.link[link->priority])
conn->data_links.link[link->priority] = 0;
}


Expand Down Expand Up @@ -2092,7 +2103,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// listeners for normal traffic but will not prevent routed-links from being established.
//
if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT &&
core->control_links_by_mask_bit[conn->mask_bit] == 0) {
conn->control_links[QD_OUTGOING] == 0) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE);
qdr_terminus_free(source);
qdr_terminus_free(target);
Expand Down
11 changes: 8 additions & 3 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
ALLOC_DEFINE(qdr_forward_deliver_info_t);


// get the control link for a given inter-router connection
// get the outgoing control link for a given inter-router connection
static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask)
{
return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0;
qdr_connection_t *conn = (conn_mask >= 0) ? core->rnode_conns_by_mask_bit[conn_mask] : 0;
return (!!conn) ? conn->control_links[QD_OUTGOING] : 0;
}


Expand All @@ -54,11 +55,15 @@ static inline qdr_link_t *peer_router_data_link(qdr_core_t *core,
if (conn_mask < 0 || priority < 0)
return 0;

qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_mask];
if (!conn)
return 0;

// Try to return the requested priority link, but if it does
// not exist, return the closest one that is lower.
qdr_link_t * link = 0;
while (1) {
if ((link = core->data_links_by_mask_bit[conn_mask].links[priority]))
if ((link = conn->data_links.link[priority]))
return link;
if (-- priority < 0)
return 0;
Expand Down
27 changes: 13 additions & 14 deletions src/router_core/route_tables.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,15 @@ void qdr_route_table_setup_CT(qdr_core_t *core)

core->neighbor_free_mask = qd_bitmask(1);

core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
DEQ_INIT(core->unallocated_group_members);
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width());
DEQ_INIT(core->unallocated_group_members);

for (int idx = 0; idx < qd_bitmask_width(); idx++) {
core->routers_by_mask_bit[idx] = 0;
core->control_links_by_mask_bit[idx] = 0;
core->data_links_by_mask_bit[idx].count = 0;
core->rnode_conns_by_mask_bit[idx] = 0;
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
core->data_links_by_mask_bit[idx].links[priority] = 0;
}
core->group_correlator_by_maskbit[idx] = (char*) malloc(QD_DISCRIMINATOR_SIZE);
core->routers_by_mask_bit[idx] = 0;
core->rnode_conns_by_mask_bit[idx] = 0;
core->group_correlator_by_maskbit[idx] = (char*) qd_malloc(QD_DISCRIMINATOR_SIZE);
core->group_correlator_by_maskbit[idx][0] = '\0';
}
}
Expand Down Expand Up @@ -441,7 +434,13 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard
return;
}

if (core->control_links_by_mask_bit[conn_maskbit] == 0) {
qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_maskbit];
if (conn == 0) {
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid conn reference: %d", conn_maskbit);
return;
}

if (conn->control_links[QD_OUTGOING] == 0) {
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit);
return;
}
Expand Down
9 changes: 3 additions & 6 deletions src/router_core/router_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ void qdr_core_free(qdr_core_t *core)
assert(DEQ_IS_EMPTY(core->streaming_connections));

if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
if (core->group_correlator_by_maskbit) {
Expand Down Expand Up @@ -1063,11 +1061,10 @@ uint64_t qdr_identifier(qdr_core_t* core)
return id;
}

void qdr_reset_sheaf(qdr_core_t *core, uint8_t n)

void qdr_reset_sheaf(qdr_connection_t *conn)
{
qdr_priority_sheaf_t *sheaf = core->data_links_by_mask_bit + n;
sheaf->count = 0;
memset(sheaf->links, 0, QDR_N_PRIORITIES * sizeof(void *));
conn->data_links = (qdr_priority_sheaf_t) {0};
}


Expand Down
22 changes: 12 additions & 10 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,14 @@ void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
const char *get_address_treatment_string(qd_address_treatment_t treatment);

// non-streaming inter-router links sorted by priority
//
typedef struct qdr_priority_sheaf_t {
qdr_link_t *link[QDR_N_PRIORITIES];
int count;
} qdr_priority_sheaf_t;


//
// Connection Information
//
Expand Down Expand Up @@ -695,6 +703,8 @@ struct qdr_connection_t {
qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
qdr_connection_info_t *connection_info;
void *user_context; /* Updated from IO thread, use work_lock */
qdr_link_t *control_links[2]; // QD_LINK_CONTROL links [QD_INCOMING/QD_OUTGOING] (inter-router conn only)
qdr_priority_sheaf_t data_links; // links for non-streaming messages (by priority) (inter-router conn only)
qd_conn_oper_status_t oper_status;
qd_conn_admin_status_t admin_status;
qdr_error_t *error;
Expand Down Expand Up @@ -767,11 +777,6 @@ struct qdr_conn_identifier_t {
qdr_auto_link_list_t auto_link_refs;
};

typedef struct qdr_priority_sheaf_t {
qdr_link_t *links[QDR_N_PRIORITIES];
int count;
} qdr_priority_sheaf_t;


struct qdr_protocol_adaptor_t {
DEQ_LINKS(qdr_protocol_adaptor_t);
Expand Down Expand Up @@ -886,8 +891,6 @@ struct qdr_core_t {
qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit)
char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit
uint64_t cost_epoch;
Expand Down Expand Up @@ -1061,10 +1064,9 @@ void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer);
* Clears the sheaf of priority links in a connection.
* Call this when a connection is being closed, when the mask-bit
* for that sheaf is being returned to the core for re-use.
* @param core Pointer to the core object returned by qd_core()
* @param n uint8_t index for the sheaf to be reset prior to re-use.
* @param conn Pointer to the connection owning the sheaf
*/
void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
void qdr_reset_sheaf(qdr_connection_t *conn);

/**
* Run in an IO thread.
Expand Down

0 comments on commit 79ae9d9

Please sign in to comment.