Skip to content

Commit

Permalink
transparent structure, iteration macro
Browse files Browse the repository at this point in the history
  • Loading branch information
akhinvasara-jumptrading committed Feb 6, 2025
1 parent 34c140e commit 3c46e30
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 152 deletions.
118 changes: 40 additions & 78 deletions src/waltz/quic/fd_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,6 @@ fd_quic_conn_new_stream( fd_quic_conn_t * conn ) {
next_stream_id >= peer_sup_stream_id ) ) {
/* this is a normal condition which occurs whenever we run up to
the peer advertised limit and represents one form of flow control */
FD_LOG_NOTICE(( "Inactive connection state %u", conn->state ));
return NULL;
}

Expand Down Expand Up @@ -1322,32 +1321,15 @@ fd_quic_abandon_enc_level( fd_quic_conn_t * conn,
conn->keys_avail = fd_uint_clear_bit( conn->keys_avail, (int)j );

/* treat all packets as ACKed (freeing handshake data, etc.) */
fd_quic_pkt_meta_treap_t * sent = &trackers->sent_pkt_metas[j];
fd_quic_pkt_meta_ds_t * sent = &trackers->sent_pkt_metas[j];
fd_quic_pkt_meta_t * pool = trackers->pkt_meta_pool_join;
fd_quic_pkt_meta_t * prev = NULL;

// TODO AMAN - rewrite prev handling
for( fd_quic_pkt_meta_treap_fwd_iter_t iter = fd_quic_pkt_meta_treap_fwd_iter_init( sent, pool );
!fd_quic_pkt_meta_treap_fwd_iter_done( iter );
iter = fd_quic_pkt_meta_treap_fwd_iter_next( iter, pool ) ) {

if( prev ) {
fd_quic_pkt_meta_pool_ele_release( pool, prev );
}

fd_quic_pkt_meta_t * e = fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );
fd_quic_reclaim_pkt_meta( conn, e, j );
prev = e;
}

if( prev ) {
fd_quic_pkt_meta_pool_ele_release( pool, prev );
}

ulong ele_max = fd_quic_pkt_meta_treap_ele_max( sent );
fd_quic_pkt_meta_treap_delete( sent );
fd_quic_pkt_meta_treap_new( &(trackers->sent_pkt_metas[j]), ele_max );
FD_QUIC_PKT_META_PROCESS_FROM_BEGIN( fd_quic_reclaim_pkt_meta( conn, e, j ),
fd_quic_pkt_meta_pool_ele_release( pool, prev ),
0,
sent,
pool );

fd_quic_pkt_meta_clear( trackers, j );
}
}

Expand Down Expand Up @@ -4898,24 +4880,17 @@ void
fd_quic_process_lost( fd_quic_conn_t * conn, uint enc_level, ulong cnt ) {
/* start at oldest sent */
fd_quic_pkt_meta_trackers_t * trackers = &conn->pkt_meta_trackers;
fd_quic_pkt_meta_treap_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_ds_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_t * pool = trackers->pkt_meta_pool_join;
ulong j = 0;
for( fd_quic_pkt_meta_treap_fwd_iter_t iter = fd_quic_pkt_meta_treap_fwd_iter_init( sent, pool );
!fd_quic_pkt_meta_treap_fwd_iter_done( iter );
iter = fd_quic_pkt_meta_treap_fwd_iter_next( iter, pool ) ) {

fd_quic_pkt_meta_t * pkt_meta = fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );
if( FD_LIKELY( j < cnt ) ) {
pkt_meta->expiry = 0; /* force expiry */
} else {
break;
}

j++;
}

/* do the processing */
/* mark immediate expiry */
FD_QUIC_PKT_META_PROCESS_FROM_BEGIN( e->expiry = 0; j++,
,
FD_UNLIKELY( j > cnt ),
sent,
pool);
/* do the retry processing */
fd_quic_pkt_meta_retry( conn->quic, conn, 0 /* don't force */, enc_level );
}

Expand All @@ -4930,10 +4905,8 @@ fd_quic_process_ack_range( fd_quic_conn_t * conn,
int is_largest,
ulong now,
ulong ack_delay ) {
/* FIXME: This would benefit from algorithmic improvements */
/* FIXME: Close connection if peer ACKed a higher packet number than we sent */


fd_quic_pkt_t * pkt = context->pkt;

/* inclusive range */
Expand All @@ -4943,34 +4916,23 @@ fd_quic_process_ack_range( fd_quic_conn_t * conn,

/* start at oldest sent */
fd_quic_pkt_meta_trackers_t * trackers = &conn->pkt_meta_trackers;
fd_quic_pkt_meta_treap_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_ds_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_t * pool = trackers->pkt_meta_pool_join;
fd_quic_pkt_meta_t * pkt_meta = pool + fd_quic_pkt_meta_treap_idx_lower_bound( sent, lo, pool );

while( pkt_meta ) {
if( FD_UNLIKELY( pkt_meta->pkt_number > hi ) ) {
break;
}

fd_quic_pkt_meta_t * pkt_meta_next = pool + pkt_meta->next;

/* note: rtt_pkt_number is zero when unused, so using >= for the test */
if( is_largest && pkt_meta->pkt_number == hi && hi >= pkt->rtt_pkt_number ) {
pkt->rtt_pkt_number = hi;
pkt->rtt_ack_time = now - pkt_meta->tx_time; /* in ticks */
pkt->rtt_ack_delay = ack_delay; /* in peer units */
}

fd_quic_reclaim_pkt_meta( conn,
pkt_meta,
enc_level );

pkt_meta = pkt_meta_next;

}
FD_QUIC_PKT_META_PROCESS( if( is_largest && e->pkt_number == hi && hi >= pkt->rtt_pkt_number ) {
pkt->rtt_pkt_number = hi;
pkt->rtt_ack_time = now - e->tx_time; /* in ticks */
pkt->rtt_ack_delay = ack_delay; /* in peer units */
}
fd_quic_reclaim_pkt_meta( conn, e, enc_level ),
,
FD_UNLIKELY( e->pkt_number > hi ),
sent,
pool,
fd_quic_pkt_meta_ds_idx_lower_bound( sent, lo, pool ) );
/* careful! assumes pool idx <--> iterator */

fd_quic_pkt_meta_remove_range( sent, pool, lo, hi );

}

static ulong
Expand Down Expand Up @@ -5080,19 +5042,19 @@ fd_quic_handle_ack_frame(
/* process lost packets */
{
fd_quic_pkt_meta_trackers_t * trackers = &conn->pkt_meta_trackers;
fd_quic_pkt_meta_treap_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_ds_t * sent = &trackers->sent_pkt_metas[enc_level];
fd_quic_pkt_meta_t * pool = trackers->pkt_meta_pool_join;
fd_quic_pkt_meta_treap_fwd_iter_t iter = fd_quic_pkt_meta_treap_fwd_iter_init( sent, pool );
fd_quic_pkt_meta_t * pkt_meta = fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );

/* FIXME - duplicate iteration here and in process_lost */
if( FD_UNLIKELY( pkt_meta && pkt_meta->pkt_number < low_ack_pkt_number ) ) {
ulong skipped = 0UL;
while( FD_LIKELY( pkt_meta && pkt_meta->pkt_number < low_ack_pkt_number ) ) {
skipped++;
iter = fd_quic_pkt_meta_treap_fwd_iter_next( iter, pool );
pkt_meta = fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );
}

ulong iter = fd_quic_pkt_meta_ds_fwd_iter_init( sent, pool );
fd_quic_pkt_meta_t * starting_meta = fd_quic_pkt_meta_ds_fwd_iter_ele( iter, pool );

if ( FD_UNLIKELY( starting_meta && starting_meta->pkt_number < low_ack_pkt_number ) ) {
ulong skipped = 0;
FD_QUIC_PKT_META_PROCESS_FROM_BEGIN(skipped++;,
,
FD_UNLIKELY( e->pkt_number >= low_ack_pkt_number ),
sent,
pool);

if( FD_UNLIKELY( skipped > 3 ) ) {
fd_quic_process_lost( conn, enc_level, skipped - 3 );
Expand Down
72 changes: 35 additions & 37 deletions src/waltz/quic/fd_quic_pkt_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

#define FD_QUIC_MAX_INFLIGHT_LOW_ENC 20
static void *
fd_quic_pkt_meta_treaps_init( fd_quic_pkt_meta_treap_t * sent_pkt_metas,
ulong appdata_max_ele ) {
fd_quic_pkt_meta_ds_init( fd_quic_pkt_meta_ds_t * sent_pkt_metas,
ulong appdata_max_ele ) {

for( ulong enc_level=0; enc_level<4; enc_level++ ) {
void* mem = fd_quic_pkt_meta_treap_new( sent_pkt_metas+enc_level,
Expand Down Expand Up @@ -31,52 +31,50 @@ fd_quic_pkt_meta_trackers_init( fd_quic_pkt_meta_trackers_t * trackers,

/* everything not saved for lower encryption levels goes to appdata */
ulong appdata_max_ele = total_meta_cnt - 3*FD_QUIC_MAX_INFLIGHT_LOW_ENC;
if( FD_UNLIKELY( !fd_quic_pkt_meta_treaps_init( trackers->sent_pkt_metas, appdata_max_ele ) ) ) return NULL;
if( FD_UNLIKELY( !fd_quic_pkt_meta_ds_init( trackers->sent_pkt_metas, appdata_max_ele ) ) ) return NULL;
return shpool;
}
#undef FD_QUIC_MAX_INFLIGHT_LOW_ENC

void
fd_quic_pkt_meta_insert( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pkt_meta,
fd_quic_pkt_meta_t * pool ) {
fd_quic_pkt_meta_treap_ele_insert( treap, pkt_meta, pool );
fd_quic_pkt_meta_insert( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pkt_meta,
fd_quic_pkt_meta_t * pool ) {
fd_quic_pkt_meta_treap_ele_insert( ds, pkt_meta, pool );
}


void
fd_quic_pkt_meta_remove_range( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pool,
ulong pkt_number_lo,
ulong pkt_number_hi ) {

fd_quic_pkt_meta_treap_fwd_iter_t iter = fd_quic_pkt_meta_treap_idx_lower_bound( treap, pkt_number_lo, pool );
fd_quic_pkt_meta_t * prev = NULL;

// AMAN - rewrite this for cleanliness
while ( !fd_quic_pkt_meta_treap_fwd_iter_done( iter ) ) {
if ( FD_LIKELY( prev ) ) {
fd_quic_pkt_meta_treap_ele_remove( treap, prev, pool );
fd_quic_pkt_meta_pool_ele_release( pool, prev );
}
fd_quic_pkt_meta_t* pkt_meta = fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );
if ( FD_UNLIKELY( pkt_meta->pkt_number > pkt_number_hi ) ) {
break;
}
prev = pkt_meta;
iter = fd_quic_pkt_meta_treap_fwd_iter_next( iter, pool );
}
if ( FD_LIKELY( prev ) ) {
fd_quic_pkt_meta_treap_ele_remove( treap, prev, pool );
fd_quic_pkt_meta_pool_ele_release( pool, prev );
}
fd_quic_pkt_meta_remove_range( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pool,
ulong pkt_number_lo,
ulong pkt_number_hi ) {

fd_quic_pkt_meta_treap_fwd_iter_t l_iter = fd_quic_pkt_meta_treap_idx_lower_bound( ds, pkt_number_lo, pool );

FD_QUIC_PKT_META_PROCESS(
,
{fd_quic_pkt_meta_treap_ele_remove( ds, prev, pool );
fd_quic_pkt_meta_pool_ele_release( pool, prev );},
FD_UNLIKELY( e->pkt_number > pkt_number_hi ),
ds,
pool,
l_iter
);
}

fd_quic_pkt_meta_t *
fd_quic_pkt_meta_min( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pkt_meta_pool ) {
fd_quic_pkt_meta_min( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pool ) {

fd_quic_pkt_meta_treap_fwd_iter_t iter = fd_quic_pkt_meta_treap_fwd_iter_init( treap, pkt_meta_pool );
if( FD_UNLIKELY( fd_quic_pkt_meta_treap_fwd_iter_done( iter ) ) ) return NULL;
return fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pkt_meta_pool );
fd_quic_pkt_meta_ds_fwd_iter_t iter = fd_quic_pkt_meta_ds_fwd_iter_init( ds, pool );
if( FD_UNLIKELY( fd_quic_pkt_meta_ds_fwd_iter_done( iter ) ) ) return NULL;
return fd_quic_pkt_meta_ds_fwd_iter_ele( iter, pool );
}

void
fd_quic_pkt_meta_clear( fd_quic_pkt_meta_trackers_t * trackers,
uint enc_level ) {
ulong ele_max = fd_quic_pkt_meta_treap_ele_max( trackers->sent_pkt_metas+enc_level );
fd_quic_pkt_meta_treap_new( trackers->sent_pkt_metas+enc_level, ele_max );
}
92 changes: 79 additions & 13 deletions src/waltz/quic/fd_quic_pkt_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,40 +118,106 @@ struct fd_quic_pkt_meta {
#define POOL_T fd_quic_pkt_meta_t
#include "../../util/tmpl/fd_pool.c"

/* alias for transparent ds */
typedef fd_quic_pkt_meta_treap_t fd_quic_pkt_meta_ds_t;
typedef fd_quic_pkt_meta_treap_fwd_iter_t fd_quic_pkt_meta_ds_fwd_iter_t;

static inline fd_quic_pkt_meta_ds_fwd_iter_t
fd_quic_pkt_meta_ds_fwd_iter_init( fd_quic_pkt_meta_ds_t * treap,
fd_quic_pkt_meta_t * pool ) {
return fd_quic_pkt_meta_treap_fwd_iter_init( treap, pool );
}

static inline fd_quic_pkt_meta_t *
fd_quic_pkt_meta_ds_fwd_iter_ele( fd_quic_pkt_meta_ds_fwd_iter_t iter,
fd_quic_pkt_meta_t * pool ) {
return fd_quic_pkt_meta_treap_fwd_iter_ele( iter, pool );
}

static inline fd_quic_pkt_meta_ds_fwd_iter_t
fd_quic_pkt_meta_ds_fwd_iter_next( fd_quic_pkt_meta_ds_fwd_iter_t iter,
fd_quic_pkt_meta_t * pool ) {
return fd_quic_pkt_meta_treap_fwd_iter_next( iter, pool );
}

static inline int
fd_quic_pkt_meta_ds_fwd_iter_done( fd_quic_pkt_meta_ds_fwd_iter_t iter ) {
return fd_quic_pkt_meta_treap_fwd_iter_done( iter );
}

static inline ulong
fd_quic_pkt_meta_ds_idx_lower_bound( fd_quic_pkt_meta_ds_t * treap,
ulong pkt_number,
fd_quic_pkt_meta_t * pool ) {
return fd_quic_pkt_meta_treap_idx_lower_bound( treap, pkt_number, pool );
}

/* end transparent ds */

struct fd_quic_pkt_meta_trackers {
fd_quic_pkt_meta_treap_t sent_pkt_metas[4];
fd_quic_pkt_meta_ds_t sent_pkt_metas[4];
fd_quic_pkt_meta_t * pkt_meta_mem; /* owns the memory for fd_pool of pkt_meta */
fd_quic_pkt_meta_t * pkt_meta_pool_join;
};


FD_PROTOTYPES_BEGIN
/*
process the pkt_meta in the chosen DS
cb verbatim executed with current pkt_meta named 'e'
prev_cb verbatim executed with previous pkt_meta named 'prev'
condition is the condition to stop processing
*/
#define FD_QUIC_PKT_META_PROCESS( cb, prev_cb, condition, sent, pool, start ) \
do { \
fd_quic_pkt_meta_t * prev = NULL; \
for( fd_quic_pkt_meta_ds_fwd_iter_t iter = start; \
!fd_quic_pkt_meta_ds_fwd_iter_done( iter ); \
iter = fd_quic_pkt_meta_ds_fwd_iter_next( iter, pool ) ) { \
fd_quic_pkt_meta_t * e = fd_quic_pkt_meta_ds_fwd_iter_ele( iter, pool ); \
if ( condition ) { \
break; \
} \
if ( prev ) { \
prev_cb; \
} \
cb; \
prev = e; \
} \
if( prev ) { \
prev_cb; \
} \
} while( 0 );

#define FD_QUIC_PKT_META_PROCESS_FROM_BEGIN( cb, prev_cb, condition, sent, pool) \
FD_QUIC_PKT_META_PROCESS( cb, prev_cb, condition, sent, pool, fd_quic_pkt_meta_ds_fwd_iter_init( sent, pool ) )

void *
fd_quic_pkt_meta_trackers_init( fd_quic_pkt_meta_trackers_t * trackers,
fd_quic_pkt_meta_t * pkt_meta_mem,
ulong total_meta_cnt );

void
fd_quic_pkt_meta_insert( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pkt_meta,
fd_quic_pkt_meta_t * pool );
fd_quic_pkt_meta_insert( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pkt_meta,
fd_quic_pkt_meta_t * pool );

/*
remove all pkt_meta in the range [pkt_number_lo, pkt_number_hi]
rm from treap and return to pool
*/
void
fd_quic_pkt_meta_remove_range( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pool,
ulong pkt_number_lo,
ulong pkt_number_hi );
fd_quic_pkt_meta_remove_range( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pool,
ulong pkt_number_lo,
ulong pkt_number_hi );

fd_quic_pkt_meta_t *
fd_quic_pkt_meta_min( fd_quic_pkt_meta_treap_t * treap,
fd_quic_pkt_meta_t * pkt_meta_pool );
fd_quic_pkt_meta_min( fd_quic_pkt_meta_ds_t * ds,
fd_quic_pkt_meta_t * pool );

void
fd_quic_pkt_meta_clear( fd_quic_pkt_meta_trackers_t * trackers,
uint enc_level );

FD_PROTOTYPES_END

#endif // HEADER_fd_src_waltz_quic_fd_quic_pkt_meta_h

3 changes: 2 additions & 1 deletion src/waltz/quic/tests/test_quic_bw.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ main( int argc,
service_server( server_quic );

client_stream = fd_quic_conn_new_stream( client_conn );
if( !client_stream ) continue;
if( client_conn->state != FD_QUIC_CONN_STATE_ACTIVE ) { FD_LOG_NOTICE(( "Early break, inactive connection")); break;}
else if( !client_stream ) continue;
fd_quic_stream_send( client_stream, buf, sz, 1 );

long t = fd_log_wallclock();
Expand Down
Loading

0 comments on commit 3c46e30

Please sign in to comment.