Skip to content

Commit

Permalink
Issue skupperproject#1748: Implement inter-router connection group up…
Browse files Browse the repository at this point in the history
…grade
  • Loading branch information
kgiusti committed Mar 10, 2025
1 parent c6fc283 commit 88f102c
Show file tree
Hide file tree
Showing 9 changed files with 719 additions and 191 deletions.
4 changes: 4 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,10 @@
"graph": true,
"description": "The total number of deliveries that have traversed this link."
},
"connectionId": {
"type": "integer",
"description": "The identity of the parent connection."
},
"presettledCount": {
"type": "integer",
"graph": true,
Expand Down
1 change: 1 addition & 0 deletions src/router_core/agent_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const char *qdr_connection_columns[] =
"meshId",
"tlsOrdinal",
"groupCorrelationId",
"groupOrdinal",
0};

const char *CONNECTION_TYPE = "io.skupper.router.connection";
Expand Down
509 changes: 366 additions & 143 deletions src/router_core/connections.c

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions src/router_core/forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
ALLOC_DEFINE(qdr_forward_deliver_info_t);


// get the control link for a given inter-router connection
// get the outgoing control link for a given inter-router connection
static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask)
{
return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0;
qdr_connection_t *conn = (conn_mask >= 0) ? core->rnode_conns_by_mask_bit[conn_mask] : 0;
return (!!conn) ? conn->control_links[QD_OUTGOING] : 0;
}


Expand All @@ -54,11 +55,15 @@ static inline qdr_link_t *peer_router_data_link(qdr_core_t *core,
if (conn_mask < 0 || priority < 0)
return 0;

qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_mask];
if (!conn)
return 0;

// Try to return the requested priority link, but if it does
// not exist, return the closest one that is lower.
qdr_link_t * link = 0;
while (1) {
if ((link = core->data_links_by_mask_bit[conn_mask].links[priority]))
if ((link = conn->data_links.link[priority]))
return link;
if (-- priority < 0)
return 0;
Expand Down
31 changes: 16 additions & 15 deletions src/router_core/route_tables.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,23 +240,18 @@ void qdr_route_table_setup_CT(qdr_core_t *core)

core->neighbor_free_mask = qd_bitmask(1);

core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->pending_rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width());
DEQ_INIT(core->unallocated_group_members);
core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width());

for (int idx = 0; idx < qd_bitmask_width(); idx++) {
core->routers_by_mask_bit[idx] = 0;
core->control_links_by_mask_bit[idx] = 0;
core->data_links_by_mask_bit[idx].count = 0;
core->rnode_conns_by_mask_bit[idx] = 0;
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
core->data_links_by_mask_bit[idx].links[priority] = 0;
}
core->group_correlator_by_maskbit[idx] = (char*) malloc(QD_DISCRIMINATOR_SIZE);
core->group_correlator_by_maskbit[idx][0] = '\0';
core->routers_by_mask_bit[idx] = 0;
core->rnode_conns_by_mask_bit[idx] = 0;
core->pending_rnode_conns_by_mask_bit[idx] = 0;
core->group_correlator_by_maskbit[idx] = (char*) qd_malloc(QD_DISCRIMINATOR_SIZE);
core->group_correlator_by_maskbit[idx][0] = '\0';
}
}
}
Expand Down Expand Up @@ -441,7 +436,13 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard
return;
}

if (core->control_links_by_mask_bit[conn_maskbit] == 0) {
qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_maskbit];
if (conn == 0) {
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid conn reference: %d", conn_maskbit);
return;
}

if (conn->control_links[QD_OUTGOING] == 0) {
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit);
return;
}
Expand Down
16 changes: 7 additions & 9 deletions src/router_core/router_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,10 @@ void qdr_core_free(qdr_core_t *core)
assert(DEQ_IS_EMPTY(core->action_list_background));
assert(DEQ_IS_EMPTY(core->streaming_connections));

if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
if (core->pending_rnode_conns_by_mask_bit) free(core->pending_rnode_conns_by_mask_bit);
if (core->group_correlator_by_maskbit) {
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
free(core->group_correlator_by_maskbit[idx]);
Expand Down Expand Up @@ -1063,11 +1062,10 @@ uint64_t qdr_identifier(qdr_core_t* core)
return id;
}

void qdr_reset_sheaf(qdr_core_t *core, uint8_t n)

void qdr_reset_sheaf(qdr_connection_t *conn)
{
qdr_priority_sheaf_t *sheaf = core->data_links_by_mask_bit + n;
sheaf->count = 0;
memset(sheaf->links, 0, QDR_N_PRIORITIES * sizeof(void *));
conn->data_links = (qdr_priority_sheaf_t) {0};
}


Expand Down
44 changes: 24 additions & 20 deletions src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,14 @@ void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
const char *get_address_treatment_string(qd_address_treatment_t treatment);

// non-streaming inter-router links sorted by priority
//
typedef struct qdr_priority_sheaf_t {
qdr_link_t *link[QDR_N_PRIORITIES];
int count;
} qdr_priority_sheaf_t;


//
// Connection Information
//
Expand Down Expand Up @@ -682,26 +690,29 @@ struct qdr_connection_t {
bool closed; // This bit is used in the case where a client is trying to force close this connection.
uint8_t next_pri; // for incoming inter-router data links
qdr_connection_role_t role;
int inter_router_cost;
qdr_conn_identifier_t *conn_id;
qdr_conn_identifier_t *alt_conn_id;
bool strip_annotations_in;
bool strip_annotations_out;
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
int inter_router_cost;
int link_capacity;
int mask_bit; ///< set only if inter-router connection
int mask_bit; ///< set only if inter-router control connection
int group_parent_mask_bit; ///< if inter-router data connection maskbit of group parent inter-router control conn
qdr_connection_work_list_t work_list;
sys_mutex_t work_lock;
qdr_link_ref_list_t links;
qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
qdr_connection_info_t *connection_info;
void *user_context; /* Updated from IO thread, use work_lock */
qdr_link_t *control_links[2]; // QD_LINK_CONTROL links [QD_INCOMING/QD_OUTGOING] (inter-router conn only)
qdr_priority_sheaf_t data_links; // links for non-streaming messages (by priority) (inter-router conn only)
qd_conn_oper_status_t oper_status;
qd_conn_admin_status_t admin_status;
qdr_error_t *error;
uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages
const qd_policy_spec_t *policy_spec;
qdr_connection_list_t connection_group; ///< List of associated connection group members
Expand Down Expand Up @@ -767,11 +778,6 @@ struct qdr_conn_identifier_t {
qdr_auto_link_list_t auto_link_refs;
};

typedef struct qdr_priority_sheaf_t {
qdr_link_t *links[QDR_N_PRIORITIES];
int count;
} qdr_priority_sheaf_t;


struct qdr_protocol_adaptor_t {
DEQ_LINKS(qdr_protocol_adaptor_t);
Expand Down Expand Up @@ -882,14 +888,13 @@ struct qdr_core_t {
qdr_address_t *router_addr_T;
qdr_address_t *routerma_addr_T;

qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit)
char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit
qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
qdr_connection_t **pending_rnode_conns_by_mask_bit; ///< higher precedence inter-router conns pending upgrade [conn->mask_bit]
qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit)
char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit
uint64_t cost_epoch;

uint64_t next_tag;
Expand Down Expand Up @@ -1061,10 +1066,9 @@ void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer);
* Clears the sheaf of priority links in a connection.
* Call this when a connection is being closed, when the mask-bit
* for that sheaf is being returned to the core for re-use.
* @param core Pointer to the core object returned by qd_core()
* @param n uint8_t index for the sheaf to be reset prior to re-use.
* @param conn Pointer to the connection owning the sheaf
*/
void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
void qdr_reset_sheaf(qdr_connection_t *conn);

/**
* Run in an IO thread.
Expand Down
109 changes: 108 additions & 1 deletion tests/system_tests_cert_rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

import time
from system_test import TestCase, main_module, Qdrouterd, unittest, retry
from system_test import CA_CERT, SSL_PROFILE_TYPE, CONNECTION_TYPE
from system_test import CA_CERT, SSL_PROFILE_TYPE, CONNECTION_TYPE, ROUTER_LINK_TYPE
from system_test import CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD
from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD
from tcp_streamer import TcpStreamerThread


class InterRouterCertRotationTest(TestCase):
Expand Down Expand Up @@ -56,6 +57,23 @@ def wait_inter_router_conns(self, router, count):
len(self.get_inter_router_conns(rtr)) == ct)
self.assertTrue(ok, f"Failed to get {count} i.r. conns: {self.get_inter_router_conns(router)}")

def get_inter_router_data_conns(self, router):
dconns = self.get_inter_router_conns(router)
return [c for c in dconns if c['role'] == 'inter-router-data']

def get_links_by_conn_id(self, router, conn_id):
mgmt = router.management
links = mgmt.query(type=ROUTER_LINK_TYPE).get_dicts()
return [link for link in links if link['connectionId'] == conn_id]

def get_streaming_data_links(self, router):
ir_conns = self.get_inter_router_data_conns(router)
links = []
for conn in ir_conns:
links.extend([link for link in self.get_links_by_conn_id(router, conn['identity'])
if link['linkType'] == 'endpoint'])
return links

def test_01_ordinal_updates(self):
"""
Verify that ordinal updates create new inter-router connections. Verify
Expand Down Expand Up @@ -194,6 +212,95 @@ def test_02_drop_old(self):
router_L.teardown()
router_C.teardown()

def test_03_tcp_streams(self):
"""
Verify that existing TCP streams are not interrupted when new
inter-router connections are established.
"""
data_conn_count = 4
inter_router_port = self.tester.get_port()
tcp_listener_port = self.tester.get_port()
tcp_connector_port = self.tester.get_port()

router_L = self.router("RouterL",
[('sslProfile', {'name': 'ListenerSslProfile',
'caCertFile': CA_CERT,
'certFile': SERVER_CERTIFICATE,
'privateKeyFile': SERVER_PRIVATE_KEY,
'password': SERVER_PRIVATE_KEY_PASSWORD}),
('listener', {'name': 'Listener01',
'role': 'inter-router',
'host': '0.0.0.0',
'port': inter_router_port,
'requireSsl': 'yes',
'sslProfile': 'ListenerSslProfile'}),
('tcpListener', {'name': 'tcpListener01',
'address': 'tcp/streaming',
'port': tcp_listener_port})],
data_conn_count, wait=False)
router_C = self.router("RouterC",
[('sslProfile', {'name': "ConnectorSslProfile",
'ordinal': 0,
'oldestValidOrdinal': 0,
'caCertFile': CA_CERT,
'certFile': CLIENT_CERTIFICATE,
'privateKeyFile': CLIENT_PRIVATE_KEY,
'password': CLIENT_PRIVATE_KEY_PASSWORD}),
('connector', {'role': 'inter-router',
'host': 'localhost',
'port': inter_router_port,
'verifyHostname': 'yes',
'sslProfile': 'ConnectorSslProfile'}),
('tcpConnector', {'name': 'tcpConnector01',
'address': 'tcp/streaming',
'host': 'localhost',
'port': tcp_connector_port})],
data_conn_count, wait=True)
router_C.wait_router_connected("RouterL")

# wait for the inter-router connections to come up
self.wait_inter_router_conns(router_C, data_conn_count + 1)

# start TCP streaming connections across the routers
tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port),
server_addr=('0.0.0.0', tcp_connector_port),
client_count=10, poll_timeout=0.2)

# Now wait until the streams have established (2 links per client) and
# traffic is passing
ok = retry(lambda rtr=router_C:
len(self.get_streaming_data_links(rtr)) == 20)
self.assertTrue(ok, f"Failed to get 20 links: {self.get_streaming_data_links(router_C)}")
begin_recv = tcp_streamer.bytes_received
ok = retry(lambda: tcp_streamer.bytes_received > begin_recv)
self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}")

# Now rotate the certs
# update tlsOrdinal to 3 and wait for new conns to appear
router_C.management.update(type=SSL_PROFILE_TYPE,
attributes={'ordinal': 3},
name='ConnectorSslProfile')
self.wait_inter_router_conns(router_C, 2 * (data_conn_count + 1))

# verify that the streamer is still running and the streams are still passing traffic
begin_recv = tcp_streamer.bytes_received
ok = retry(lambda: tcp_streamer.bytes_received > begin_recv)
self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}")
self.assertTrue(tcp_streamer.is_alive, "Streamer has failed!")

# Update oldestValidOrdinal to 3. Expect the connections that carry the
# streaming data to close
router_C.management.update(type=SSL_PROFILE_TYPE,
attributes={'oldestValidOrdinal': 3},
name='ConnectorSslProfile')
self.wait_inter_router_conns(router_C, data_conn_count + 1)
ok = retry(lambda: tcp_streamer.is_alive is False)
self.assertTrue(ok, "Failed to terminate the streamer")

router_L.teardown()
router_C.teardown()
tcp_streamer.join()


if __name__ == '__main__':
unittest.main(main_module())
Loading

0 comments on commit 88f102c

Please sign in to comment.