Skip to content

Commit

Permalink
feat: store signer messages in local sqlite database (#664)
Browse files Browse the repository at this point in the history
Creates a local sqlite DB for signer messages (`stacks_signers.sqlite`)
and stores all messages in it so we can later retrieve them via
predicate scans.
  • Loading branch information
rafaelcr authored Oct 25, 2024
1 parent 71364c1 commit d12acd9
Show file tree
Hide file tree
Showing 13 changed files with 760 additions and 105 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/chainhook-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
] }
tokio = { version = "1.38.1", features = ["full"] }
rusqlite = { version = "0.31.0", features = ["bundled"] }
slog = { version = "2.7.0" }
futures-util = "0.3.24"
flate2 = "1.0.24"
tar = "0.4.38"
Expand Down
9 changes: 4 additions & 5 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::storage::{
delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks,
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry,
open_readwrite_stacks_db_conn, set_last_confirmed_insert_key,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
set_last_confirmed_insert_key, StacksDbConnections,
};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification;
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap;
Expand Down Expand Up @@ -547,15 +547,14 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
)
.await;
// Refresh DB connection so it picks up recent changes made by TSV consolidation.
let new_conn = open_readonly_stacks_db_conn_with_retry(
let mut db_conns = StacksDbConnections::open_readonly(
&config.expected_cache_path(),
5,
&ctx,
)?;
scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
None,
&new_conn,
&mut db_conns,
&config,
None,
&ctx,
Expand Down
29 changes: 21 additions & 8 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@ use crate::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
signers::get_signer_db_messages_received_at_block, StacksDbConnections,
},
};
use chainhook_sdk::types::{BlockIdentifier, Chain};
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::Context,
};
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_predicate_on_non_consensus_events,
types::{BlockIdentifier, Chain},
};
use chainhook_sdk::{
chainhooks::stacks::{
handle_stacks_hook_action, StacksChainhookInstance, StacksChainhookOccurrence,
StacksTriggerChainhook,
},
utils::{file_append, send_request, AbstractStacksBlock},
};
use rocksdb::DB;

use super::common::PredicateScanResult;

Expand Down Expand Up @@ -180,11 +183,12 @@ pub async fn get_canonical_fork_from_tsv(
pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
predicate_spec: &StacksChainhookInstance,
unfinished_scan_data: Option<ScanningData>,
stacks_db_conn: &DB,
db_conns: &mut StacksDbConnections,
config: &Config,
kill_signal: Option<Arc<RwLock<bool>>>,
ctx: &Context,
) -> Result<PredicateScanResult, String> {
let stacks_db_conn = &db_conns.stacks_db;
let predicate_uuid = &predicate_spec.uuid;
let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
Expand Down Expand Up @@ -327,20 +331,28 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
last_block_scanned = block_data.block_identifier.clone();

let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];

let (hits_per_blocks, _predicates_expired) =
evaluate_stacks_chainhook_on_blocks(blocks, predicate_spec, ctx);

if hits_per_blocks.is_empty() {
let events = get_signer_db_messages_received_at_block(
&mut db_conns.signers_db,
&block_data.block_identifier,
)?;
let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events(
&events,
predicate_spec,
ctx,
);

if hits_per_blocks.is_empty() && hits_per_events.is_empty() {
continue;
}

let trigger = StacksTriggerChainhook {
chainhook: predicate_spec,
apply: hits_per_blocks,
rollback: vec![],
// TODO(rafaelcr): Query for non consensus events which fall between block timestamps to fill in here
events: vec![]
events: hits_per_events,
};
let res = match handle_stacks_hook_action(
trigger,
Expand Down Expand Up @@ -536,7 +548,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
// TODO(rafaelcr): Consider StackerDB chunks that come from TSVs.
events: vec![]
events: vec![],
};
match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx)
{
Expand Down Expand Up @@ -646,6 +658,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;

if blocks_inserted % 2500 == 0 {
Expand Down
81 changes: 50 additions & 31 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::signers::{initialize_signers_db, store_signer_db_messages};
use crate::storage::{
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks,
get_last_block_height_inserted, open_readonly_stacks_db_conn_with_retry,
Expand All @@ -19,6 +20,7 @@ use chainhook_sdk::observer::{
PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData,
StacksObserverStartupContext,
};
use chainhook_sdk::{try_error, try_info};
use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent};
use chainhook_sdk::utils::Context;
use redis::{Commands, Connection};
Expand Down Expand Up @@ -152,10 +154,12 @@ impl Service {
}
}

initialize_signers_db(&self.config.expected_cache_path(), &self.ctx)
.map_err(|e| format!("unable to initialize signers db: {e}"))?;

let (observer_command_tx, observer_command_rx) =
observer_commands_tx_rx.unwrap_or(channel());
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
// let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel();

let mut event_observer_config = self.config.get_event_observer_config();
event_observer_config.registered_chainhooks = chainhook_store;
Expand Down Expand Up @@ -441,12 +445,14 @@ impl Service {
data,
) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireBitcoinPredicate(
Expand All @@ -466,12 +472,14 @@ impl Service {
data,
) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireBitcoinPredicate(
Expand Down Expand Up @@ -547,10 +555,16 @@ impl Service {
};
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {},
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => {
// TODO(rafaelcr): Store signer data.
println!("signer message: {:?}", data);
if let Err(e) = store_signer_db_messages(
&self.config.expected_cache_path(),
&data.events,
&self.ctx,
) {
try_error!(self.ctx, "unable to store signer messages: {e}");
};
try_info!(self.ctx, "Stored {} stacks non-consensus events", data.events.len());
}
},
Err(e) => {
Expand All @@ -574,12 +588,14 @@ impl Service {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireStacksPredicate(
Expand All @@ -597,12 +613,14 @@ impl Service {
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireStacksPredicate(
Expand All @@ -619,10 +637,10 @@ impl Service {
}
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {},
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => {
// TODO(rafaelcr): Expire signer message predicates when appropriate
},
}
};
update_status_from_report(
Chain::Stacks,
Expand All @@ -640,7 +658,8 @@ impl Service {
&mut self.config,
&self.ctx,
)
.await {
.await
{
error!(
self.ctx.expect_logger(),
"Failed to update database from archive: {e}"
Expand Down
7 changes: 3 additions & 4 deletions components/chainhook-cli/src/service/runloops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status},
storage::open_readonly_stacks_db_conn,
service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, storage::StacksDbConnections,
};

use super::ScanningData;
Expand Down Expand Up @@ -54,7 +53,7 @@ pub fn start_stacks_scan_runloop(
let kill_signal = Arc::new(RwLock::new(false));
kill_signals.insert(predicate_spec.uuid.clone(), kill_signal.clone());
stacks_scan_pool.execute(move || {
let stacks_db_conn = match open_readonly_stacks_db_conn(
let mut db_conns = match StacksDbConnections::open_readonly(
&moved_config.expected_cache_path(),
&moved_ctx,
) {
Expand All @@ -75,7 +74,7 @@ pub fn start_stacks_scan_runloop(
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
unfinished_scan_data,
&stacks_db_conn,
&mut db_conns,
&moved_config,
Some(kill_signal),
&moved_ctx,
Expand Down
Loading

0 comments on commit d12acd9

Please sign in to comment.