Skip to content

Commit

Permalink
feat(blockstore): para shred pool / map and slice (entry batch) exec
Browse files Browse the repository at this point in the history
  • Loading branch information
lidatong authored and ibhatt-jumptrading committed Feb 6, 2025
1 parent ddc0dfb commit 164090a
Show file tree
Hide file tree
Showing 26 changed files with 915 additions and 860 deletions.
5 changes: 0 additions & 5 deletions src/app/backtest/Local.mk

This file was deleted.

60 changes: 0 additions & 60 deletions src/app/backtest/fd_backtest_ctl.c

This file was deleted.

8 changes: 5 additions & 3 deletions src/app/fdctl/run/tiles/fd_repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct fd_repair_tile_ctx {
fd_stem_context_t * stem;

fd_wksp_t * blockstore_wksp;
fd_blockstore_t blockstore_ljoin;
fd_blockstore_t * blockstore;

fd_keyguard_client_t keyguard_client[1];
Expand Down Expand Up @@ -524,7 +525,8 @@ unprivileged_init( fd_topo_t * topo,

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
ctx->blockstore = &ctx->blockstore_ljoin;
ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );

void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
Expand Down Expand Up @@ -572,7 +574,7 @@ unprivileged_init( fd_topo_t * topo,
FD_LOG_ERR(( "no blocktore workspace" ));
}

ctx->blockstore = fd_blockstore_join( fd_topo_obj_laddr( topo, blockstore_obj_id ) );
ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
FD_TEST( ctx->blockstore!=NULL );

fd_topo_link_t * netmux_link = &topo->links[ tile->in_link_id[ 0 ] ];
Expand Down Expand Up @@ -662,7 +664,7 @@ populate_allowed_seccomp( fd_topo_t const * topo,
(void)topo;
(void)tile;

populate_sock_filter_policy_repair(
populate_sock_filter_policy_repair(
out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)tile->repair.good_peer_cache_file_fd );
return sock_filter_policy_repair_instr_cnt;
}
Expand Down
49 changes: 25 additions & 24 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ struct fd_replay_tile_ctx {

/* Depends on store_int and is polled in after_credit */

fd_blockstore_t * blockstore;
fd_blockstore_t blockstore_ljoin;
int blockstore_fd; /* file descriptor for archival file */
fd_blockstore_t * blockstore;

/* Updated during execution */

Expand Down Expand Up @@ -363,7 +364,7 @@ scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
}
l = FD_LAYOUT_APPEND( l, 128UL, FD_MBATCH_MAX );
l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
ulong thread_spad_size = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
l = FD_LAYOUT_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) );
l = FD_LAYOUT_APPEND( l, fd_spad_align(), FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT ); /* FIXME: make this configurable */
Expand Down Expand Up @@ -1135,7 +1136,7 @@ publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
msg->type = FD_REPLAY_SLOT_TYPE;
msg->slot_exec.slot = curr_slot;
msg->slot_exec.parent = ctx->parent_slot;
msg->slot_exec.root = ctx->blockstore->smr;
msg->slot_exec.root = ctx->blockstore->shmem->smr;
msg->slot_exec.height = ( block_map_entry ? block_map_entry->block_height : 0UL );
msg->slot_exec.transaction_count = fork->slot_ctx.slot_bank.transaction_count;
memcpy( &msg->slot_exec.bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof( fd_hash_t ) );
Expand Down Expand Up @@ -1609,11 +1610,11 @@ after_frag( fd_replay_tile_ctx_t * ctx,
fd_blockstore_end_read( ctx->blockstore );

if( FD_LIKELY( block_map_entry->data_complete_idx != FD_SHRED_IDX_NULL ) ) {
uint i = block_map_entry->replayed_idx + 1;
uint i = block_map_entry->consumed_idx + 1;
uint j = block_map_entry->data_complete_idx;

/* If this is the first batch being verified of this block, need to populate the slot_bank's tick height for tick verification */
if( FD_UNLIKELY( block_map_entry->replayed_idx + 1 == 0 ) ){
if( FD_UNLIKELY( block_map_entry->consumed_idx + 1 == 0 ) ){
FD_LOG_NOTICE(("Preparing first batch execution of slot %lu", ctx->curr_slot));
prepare_first_batch_execution( ctx, stem );
}
Expand All @@ -1629,15 +1630,12 @@ after_frag( fd_replay_tile_ctx_t * ctx,
required because txns can span multiple shreds. */
ulong mbatch_sz = 0;

fd_blockstore_start_read( ctx->blockstore );

int err = fd_blockstore_batch_assemble( ctx->blockstore,
ctx->curr_slot,
block_map_entry->replayed_idx + 1,
FD_MBATCH_MAX,
ctx->mbatch,
&mbatch_sz );
fd_blockstore_end_read( ctx->blockstore );
int err = fd_blockstore_slice_query( ctx->blockstore,
ctx->curr_slot,
block_map_entry->consumed_idx + 1,
FD_SLICE_MAX,
ctx->mbatch,
&mbatch_sz );

if( FD_UNLIKELY( err ) ){
FD_LOG_ERR(( "Failed to assemble microblock batch" ));
Expand All @@ -1648,10 +1646,13 @@ after_frag( fd_replay_tile_ctx_t * ctx,
// TODO: handle invalid batch how & do thread handling
FD_LOG_ERR(( "Failed to process microblock batch" ));
}
if( FD_UNLIKELY( idx == block_map_entry->slot_complete_idx ) ) {
for( uint idx = 0; idx <= block_map_entry->slot_complete_idx; idx++ ) {
fd_blockstore_shred_remove( ctx->blockstore, ctx->curr_slot, idx );
}
}
//replay_mbatch( ctx, block_map_entry->replayed_idx + 1 );
fd_blockstore_start_write( ctx->blockstore );
block_map_entry->replayed_idx = idx;
fd_blockstore_end_write( ctx->blockstore );
block_map_entry->consumed_idx = idx;
}
}
}
Expand Down Expand Up @@ -1766,7 +1767,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
if( FD_LIKELY( block_ ) ) {
block_map_entry->flags = fd_uchar_set_bit( block_map_entry->flags, FD_BLOCK_FLAG_PROCESSED );
block_map_entry->flags = fd_uchar_clear_bit( block_map_entry->flags, FD_BLOCK_FLAG_REPLAYING );
ctx->blockstore->lps = block_map_entry->slot;
ctx->blockstore->shmem->lps = block_map_entry->slot;
memcpy( &block_map_entry->bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof( fd_hash_t ) );
}
fd_blockstore_end_write( ctx->blockstore );
Expand Down Expand Up @@ -1822,7 +1823,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
break;
}
s = block_map_entry->parent_slot;
if( s < ctx->blockstore->smr ) {
if( s < ctx->blockstore->shmem->smr ) {
break;
}
*(ulong*)(msg + 24U + i*8U) = s;
Expand Down Expand Up @@ -2476,7 +2477,7 @@ during_housekeeping( void * _ctx ) {
root and blockstore smr. */

fd_blockstore_start_read( ctx->blockstore );
ulong wmk = fd_ulong_min( ctx->root, ctx->blockstore->smr );
ulong wmk = fd_ulong_min( ctx->root, ctx->blockstore->shmem->smr );
fd_blockstore_end_read( ctx->blockstore );

if ( FD_LIKELY( wmk <= fd_fseq_query( ctx->wmk ) ) ) return;
Expand Down Expand Up @@ -2557,7 +2558,7 @@ unprivileged_init( fd_topo_t * topo,
for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
ctx->bmtree[i] = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
}
void * mbatch_mem = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_MBATCH_MAX );
void * mbatch_mem = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
ulong thread_spad_size = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
void * spad_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) + FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT );
ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
Expand All @@ -2574,7 +2575,6 @@ unprivileged_init( fd_topo_t * topo,
/**********************************************************************/

ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
ctx->blockstore = NULL;

ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
FD_TEST( blockstore_obj_id!=ULONG_MAX );
Expand All @@ -2583,8 +2583,9 @@ unprivileged_init( fd_topo_t * topo,
FD_LOG_ERR(( "no blockstore wksp" ));
}

ctx->blockstore = fd_blockstore_join( fd_topo_obj_laddr( topo, blockstore_obj_id ) );
FD_TEST( ctx->blockstore!=NULL );
ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
fd_buf_shred_pool_reset( ctx->blockstore->shred_pool, 0 );
FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );

ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
FD_TEST( status_cache_obj_id != ULONG_MAX );
Expand Down
2 changes: 1 addition & 1 deletion src/app/fdctl/run/tiles/fd_rpcserv.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ privileged_init( fd_topo_t * topo,
/* Blockstore setup */
ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
FD_TEST( blockstore_obj_id!=ULONG_MAX );
args->blockstore = fd_blockstore_join( fd_topo_obj_laddr( topo, blockstore_obj_id ) );
args->blockstore = fd_blockstore_join( &args->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
FD_TEST( args->blockstore!=NULL );
ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDONLY );
if ( FD_UNLIKELY(ctx->blockstore_fd == -1) ){
Expand Down
29 changes: 16 additions & 13 deletions src/app/fdctl/run/tiles/fd_store_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ struct fd_store_tile_ctx {
fd_pubkey_t identity_key[1]; /* Just the public key */

fd_store_t * store;
fd_blockstore_t * blockstore;
fd_blockstore_t blockstore_ljoin;
int blockstore_fd; /* file descriptor for archival file */
fd_blockstore_t * blockstore;

fd_wksp_t * stake_in_mem;
ulong stake_in_chunk0;
Expand Down Expand Up @@ -271,13 +272,13 @@ after_frag( fd_store_tile_ctx_t * ctx,
return;
}

if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_OK ) {
if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
FD_LOG_ERR(( "failed inserting to blockstore" ));
} else if ( ctx->shred_cap_ctx.is_archive ) {
uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_REPAIR(0);
if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
FD_LOG_ERR(( "failed at archiving repair shred to file" ));
}
uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_REPAIR( 0 );
if( fd_shred_cap_archive( &ctx->shred_cap_ctx, shred, shred_cap_flag ) < FD_SHRED_CAP_OK ) {
FD_LOG_ERR( ( "failed at archiving repair shred to file" ) );
}
}
return;
}
Expand Down Expand Up @@ -310,16 +311,16 @@ after_frag( fd_store_tile_ctx_t * ctx,
}
// TODO: improve return value of api to not use < OK

if( fd_store_shred_insert( ctx->store, &ctx->s34_buffer->pkts[i].shred ) < FD_BLOCKSTORE_OK ) {
if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
FD_LOG_ERR(( "failed inserting to blockstore" ));
} else if ( ctx->shred_cap_ctx.is_archive ) {
uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_TURBINE(0);
if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, &ctx->s34_buffer->pkts[i].shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
FD_LOG_ERR(( "failed at archiving turbine shred to file" ));
}
}

fd_store_shred_update_with_shred_from_turbine( ctx->store, &ctx->s34_buffer->pkts[i].shred );
fd_store_shred_update_with_shred_from_turbine( ctx->store, shred );
}
}

Expand Down Expand Up @@ -475,7 +476,7 @@ after_credit( fd_store_tile_ctx_t * ctx,

if( FD_UNLIKELY( ctx->sim &&
ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) {
FD_LOG_WARNING(( "Sim is complete." ));
// FD_LOG_WARNING(( "Sim is complete." ));
}

for( ulong i = 0; i<fd_txn_iter_map_slot_cnt(); i++ ) {
Expand Down Expand Up @@ -549,6 +550,7 @@ unprivileged_init( fd_topo_t * topo,

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
ctx->blockstore = &ctx->blockstore_ljoin;
// TODO: set the lo_mark_slot to the actual snapshot slot!
ctx->store = fd_store_join( fd_store_new( FD_SCRATCH_ALLOC_APPEND( l, fd_store_align(), fd_store_footprint() ), 1 ) );
ctx->repair_req_buffer = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
Expand Down Expand Up @@ -591,7 +593,7 @@ unprivileged_init( fd_topo_t * topo,
do {
expected_shred_version = fd_fseq_query( gossip_shred_version );
} while( expected_shred_version==ULONG_MAX );
FD_LOG_INFO(( "using shred version %lu", expected_shred_version ));
FD_LOG_NOTICE(( "using shred version %lu", expected_shred_version ));
}
if( FD_UNLIKELY( expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
FD_TEST( expected_shred_version );
Expand All @@ -608,7 +610,7 @@ unprivileged_init( fd_topo_t * topo,
ulong tag = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.wksp_tag", blockstore_obj_id );
if( FD_LIKELY( fd_wksp_tag_query( ctx->blockstore_wksp, &tag, 1, &info, 1 ) > 0 ) ) {
void * blockstore_mem = fd_wksp_laddr_fast( ctx->blockstore_wksp, info.gaddr_lo );
ctx->blockstore = fd_blockstore_join( blockstore_mem );
ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_mem );
} else {
FD_LOG_WARNING(( "failed to find blockstore in workspace. making new blockstore." ));
}
Expand All @@ -618,7 +620,7 @@ unprivileged_init( fd_topo_t * topo,
FD_LOG_ERR(( "failed to find blockstore" ));
}

ctx->blockstore = fd_blockstore_join( blockstore_shmem );
ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_shmem );
}

FD_LOG_NOTICE(( "blockstore: %s", tile->store_int.blockstore_file ));
Expand Down Expand Up @@ -736,6 +738,7 @@ unprivileged_init( fd_topo_t * topo,
ctx->sim = 1;
FD_TEST( fd_shred_cap_replay( tile->store_int.shred_cap_replay, ctx->store ) == FD_SHRED_CAP_OK );
}

}

static ulong
Expand Down
Loading

0 comments on commit 164090a

Please sign in to comment.