From d12acd9c99ce0d0721dfcb853e5cb236e45925b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Fri, 25 Oct 2024 10:45:48 -0600 Subject: [PATCH] feat: store signer messages in local sqlite database (#664) Creates a local sqlite DB for signer messages (`stacks_signers.sqlite`) and stores all messages in it so we can later retrieve them via predicate scans. --- Cargo.lock | 2 + components/chainhook-cli/Cargo.toml | 2 + components/chainhook-cli/src/cli/mod.rs | 9 +- components/chainhook-cli/src/scan/stacks.rs | 29 +- components/chainhook-cli/src/service/mod.rs | 81 +-- .../chainhook-cli/src/service/runloops.rs | 7 +- components/chainhook-cli/src/storage/mod.rs | 51 +- .../chainhook-cli/src/storage/signers.rs | 498 ++++++++++++++++++ .../chainhook-cli/src/storage/sqlite.rs | 87 +++ .../src/chainhooks/stacks/mod.rs | 5 +- .../chainhook-sdk/src/indexer/stacks/mod.rs | 76 +-- .../chainhook-sdk/src/indexer/stacks/tests.rs | 12 +- components/chainhook-sdk/src/observer/mod.rs | 6 +- 13 files changed, 760 insertions(+), 105 deletions(-) create mode 100644 components/chainhook-cli/src/storage/signers.rs create mode 100644 components/chainhook-cli/src/storage/sqlite.rs diff --git a/Cargo.lock b/Cargo.lock index 989eeb269..ae93e54c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -496,11 +496,13 @@ dependencies = [ "rocket", "rocket_okapi", "rocksdb", + "rusqlite", "serde", "serde-redis", "serde_derive", "serde_json", "serial_test", + "slog", "tar", "test-case", "threadpool", diff --git a/components/chainhook-cli/Cargo.toml b/components/chainhook-cli/Cargo.toml index ba1b4e07f..d29ed21a3 100644 --- a/components/chainhook-cli/Cargo.toml +++ b/components/chainhook-cli/Cargo.toml @@ -31,6 +31,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "rustls-tls", ] } tokio = { version = "1.38.1", features = ["full"] } +rusqlite = { version = "0.31.0", features = ["bundled"] } +slog = { version = "2.7.0" } futures-util = "0.3.24" flate2 = "1.0.24" tar = "0.4.38" diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index cc0be7bb5..219950f36 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -11,8 +11,8 @@ use crate::storage::{ delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks, get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks, - is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry, - open_readwrite_stacks_db_conn, set_last_confirmed_insert_key, + is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn, + set_last_confirmed_insert_key, StacksDbConnections, }; use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification; use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap; @@ -547,15 +547,14 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { ) .await; // Refresh DB connection so it picks up recent changes made by TSV consolidation. - let new_conn = open_readonly_stacks_db_conn_with_retry( + let mut db_conns = StacksDbConnections::open_readonly( &config.expected_cache_path(), - 5, &ctx, )?; scan_stacks_chainstate_via_rocksdb_using_predicate( &predicate_spec, None, - &new_conn, + &mut db_conns, &config, None, &ctx, diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 9e021513e..77d74dad5 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -17,14 +17,18 @@ use crate::{ get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present, open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, + signers::get_signer_db_messages_received_at_block, StacksDbConnections, }, }; -use chainhook_sdk::types::{BlockIdentifier, Chain}; use chainhook_sdk::{ chainhooks::stacks::evaluate_stacks_chainhook_on_blocks, indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, utils::Context, }; +use chainhook_sdk::{ + chainhooks::stacks::evaluate_stacks_predicate_on_non_consensus_events, + types::{BlockIdentifier, Chain}, +}; use chainhook_sdk::{ chainhooks::stacks::{ handle_stacks_hook_action, StacksChainhookInstance, StacksChainhookOccurrence, @@ -32,7 +36,6 @@ use chainhook_sdk::{ }, utils::{file_append, send_request, AbstractStacksBlock}, }; -use rocksdb::DB; use super::common::PredicateScanResult; @@ -180,11 +183,12 @@ pub async fn get_canonical_fork_from_tsv( pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( predicate_spec: &StacksChainhookInstance, unfinished_scan_data: Option, - stacks_db_conn: &DB, + db_conns: &mut StacksDbConnections, config: &Config, kill_signal: Option>>, ctx: &Context, ) -> Result { + let stacks_db_conn = &db_conns.stacks_db; let predicate_uuid = &predicate_spec.uuid; let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, @@ -327,11 +331,20 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( last_block_scanned = block_data.block_identifier.clone(); let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data]; - let (hits_per_blocks, _predicates_expired) = evaluate_stacks_chainhook_on_blocks(blocks, predicate_spec, ctx); - if hits_per_blocks.is_empty() { + let events = get_signer_db_messages_received_at_block( + &mut db_conns.signers_db, + &block_data.block_identifier, + )?; + let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events( + &events, + predicate_spec, + ctx, + ); + + if hits_per_blocks.is_empty() && hits_per_events.is_empty() { continue; } @@ -339,8 +352,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( chainhook: predicate_spec, apply: hits_per_blocks, rollback: vec![], - // TODO(rafaelcr): Query for non consensus events which fall between block timestamps to fill in here - events: vec![] + events: hits_per_events, }; let res = match handle_stacks_hook_action( trigger, @@ -536,7 +548,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( apply: hits_per_blocks, rollback: vec![], // TODO(rafaelcr): Consider StackerDB chunks that come from TSVs. - events: vec![] + events: vec![], }; match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx) { @@ -646,6 +658,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( } }; + // TODO(rafaelcr): Store signer messages insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?; if blocks_inserted % 2500 == 0 { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 7e50be6d3..ea22ef976 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -5,6 +5,7 @@ use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; +use crate::storage::signers::{initialize_signers_db, store_signer_db_messages}; use crate::storage::{ confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks, get_last_block_height_inserted, open_readonly_stacks_db_conn_with_retry, @@ -19,6 +20,7 @@ use chainhook_sdk::observer::{ PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData, StacksObserverStartupContext, }; +use chainhook_sdk::{try_error, try_info}; use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent}; use chainhook_sdk::utils::Context; use redis::{Commands, Connection}; @@ -152,10 +154,12 @@ impl Service { } } + initialize_signers_db(&self.config.expected_cache_path(), &self.ctx) + .map_err(|e| format!("unable to initialize signers db: {e}"))?; + let (observer_command_tx, observer_command_rx) = observer_commands_tx_rx.unwrap_or(channel()); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); - // let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel(); let mut event_observer_config = self.config.get_event_observer_config(); event_observer_config.registered_chainhooks = chainhook_store; @@ -441,12 +445,14 @@ impl Service { data, ) => { for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Bitcoin, + confirmed_block.block_identifier.index, + &mut predicates_db_conn, + &ctx, + ) + { for uuid in expired_predicate_uuids.into_iter() { let _ = observer_command_tx.send( ObserverCommand::ExpireBitcoinPredicate( @@ -466,12 +472,14 @@ impl Service { data, ) => { for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Bitcoin, + confirmed_block.block_identifier.index, + &mut predicates_db_conn, + &ctx, + ) + { for uuid in expired_predicate_uuids.into_iter() { let _ = observer_command_tx.send( ObserverCommand::ExpireBitcoinPredicate( @@ -547,10 +555,16 @@ impl Service { }; } StacksChainEvent::ChainUpdatedWithMicroblocks(_) - | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}, + | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => { - // TODO(rafaelcr): Store signer data. - println!("signer message: {:?}", data); + if let Err(e) = store_signer_db_messages( + &self.config.expected_cache_path(), + &data.events, + &self.ctx, + ) { + try_error!(self.ctx, "unable to store signer messages: {e}"); + }; + try_info!(self.ctx, "Stored {} stacks non-consensus events", data.events.len()); } }, Err(e) => { @@ -574,12 +588,14 @@ impl Service { StacksChainEvent::ChainUpdatedWithBlocks(data) => { stacks_event += 1; for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Stacks, + confirmed_block.block_identifier.index, + &mut predicates_db_conn, + &ctx, + ) + { for uuid in expired_predicate_uuids.into_iter() { let _ = observer_command_tx.send( ObserverCommand::ExpireStacksPredicate( @@ -597,12 +613,14 @@ impl Service { } StacksChainEvent::ChainUpdatedWithReorg(data) => { for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Stacks, + confirmed_block.block_identifier.index, + &mut predicates_db_conn, + &ctx, + ) + { for uuid in expired_predicate_uuids.into_iter() { let _ = observer_command_tx.send( ObserverCommand::ExpireStacksPredicate( @@ -619,10 +637,10 @@ impl Service { } } StacksChainEvent::ChainUpdatedWithMicroblocks(_) - | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}, + | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => { // TODO(rafaelcr): Expire signer message predicates when appropriate - }, + } }; update_status_from_report( Chain::Stacks, @@ -640,7 +658,8 @@ impl Service { &mut self.config, &self.ctx, ) - .await { + .await + { error!( self.ctx.expect_logger(), "Failed to update database from archive: {e}" diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index c271b60e0..c177b5636 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -19,8 +19,7 @@ use crate::{ bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult, stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, }, - service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, - storage::open_readonly_stacks_db_conn, + service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, storage::StacksDbConnections, }; use super::ScanningData; @@ -54,7 +53,7 @@ pub fn start_stacks_scan_runloop( let kill_signal = Arc::new(RwLock::new(false)); kill_signals.insert(predicate_spec.uuid.clone(), kill_signal.clone()); stacks_scan_pool.execute(move || { - let stacks_db_conn = match open_readonly_stacks_db_conn( + let mut db_conns = match StacksDbConnections::open_readonly( &moved_config.expected_cache_path(), &moved_ctx, ) { @@ -75,7 +74,7 @@ pub fn start_stacks_scan_runloop( let op = scan_stacks_chainstate_via_rocksdb_using_predicate( &predicate_spec, unfinished_scan_data, - &stacks_db_conn, + &mut db_conns, &moved_config, Some(kill_signal), &moved_ctx, diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index 55fa9e8f9..25a48e6af 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -1,9 +1,14 @@ +pub mod signers; +pub mod sqlite; + use std::collections::VecDeque; use std::path::PathBuf; use chainhook_sdk::types::{BlockIdentifier, StacksBlockData, StacksBlockUpdate}; use chainhook_sdk::utils::Context; use rocksdb::{Options, DB}; +use rusqlite::Connection; +use signers::open_readonly_signers_db_conn; const UNCONFIRMED_KEY_PREFIX: &[u8; 2] = b"~:"; const CONFIRMED_KEY_PREFIX: &[u8; 2] = b"b:"; @@ -11,6 +16,24 @@ const KEY_SUFFIX: &[u8; 2] = b":d"; const LAST_UNCONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:~"; const LAST_CONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:t"; +/// Keeps references to all databases used to monitor Stacks transactions and events. +// TODO(rafaelcr): Expand this struct to be flexible enough to include Bitcoin DBs and/or turn some DBs on/off. +pub struct StacksDbConnections { + pub stacks_db: DB, + // TODO(rafaelcr): Make this optional if we're not interested in signer data. + pub signers_db: Connection, +} + +impl StacksDbConnections { + /// Opens all connections in read-only mode. + pub fn open_readonly(base_dir: &PathBuf, ctx: &Context) -> Result { + Ok(StacksDbConnections { + stacks_db: open_readonly_stacks_db_conn(base_dir, ctx)?, + signers_db: open_readonly_signers_db_conn(base_dir, ctx)?, + }) + } +} + fn get_db_default_options() -> Options { let mut opts = Options::default(); opts.create_if_missing(true); @@ -70,9 +93,8 @@ pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result { match open_readwrite_stacks_db_conn(base_dir, ctx) { Ok(_) => { - let db = DB::open_for_read_only(&opts, path, false).map_err(|e| { - format!("unable to open stacks.rocksdb: {}", e) - })?; + let db = DB::open_for_read_only(&opts, path, false) + .map_err(|e| format!("unable to open stacks.rocksdb: {}", e))?; Ok(db) } Err(e) => Err(e), @@ -87,8 +109,7 @@ pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result pub fn open_readwrite_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result { let path = get_default_stacks_db_file_path(base_dir); let opts = get_db_default_options(); - let db = DB::open(&opts, path) - .map_err(|e| format!("unable to open stacks.rocksdb: {}", e))?; + let db = DB::open(&opts, path).map_err(|e| format!("unable to open stacks.rocksdb: {}", e))?; Ok(db) } @@ -195,9 +216,12 @@ pub fn delete_confirmed_entry_from_stacks_blocks( pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { stacks_db .get(get_last_unconfirmed_insert_key()) - .unwrap_or(None).map(|bytes| u64::from_be_bytes([ + .unwrap_or(None) + .map(|bytes| { + u64::from_be_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ])) + ]) + }) } pub fn get_all_unconfirmed_blocks( @@ -226,9 +250,12 @@ pub fn get_all_unconfirmed_blocks( pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { stacks_db .get(get_last_confirmed_insert_key()) - .unwrap_or(None).map(|bytes| u64::from_be_bytes([ + .unwrap_or(None) + .map(|bytes| { + u64::from_be_bytes([ bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ])) + ]) + }) } pub fn confirm_entries_in_stacks_blocks( @@ -273,10 +300,8 @@ pub fn get_stacks_block_at_block_height( }) { Ok(Some(entry)) => { return Ok(Some({ - let spec: StacksBlockData = - serde_json::from_slice(&entry[..]).map_err(|e| { - format!("unable to deserialize Stacks block {}", e) - })?; + let spec: StacksBlockData = serde_json::from_slice(&entry[..]) + .map_err(|e| format!("unable to deserialize Stacks block {}", e))?; spec })) } diff --git a/components/chainhook-cli/src/storage/signers.rs b/components/chainhook-cli/src/storage/signers.rs new file mode 100644 index 000000000..40a65321c --- /dev/null +++ b/components/chainhook-cli/src/storage/signers.rs @@ -0,0 +1,498 @@ +use std::path::PathBuf; + +use chainhook_sdk::{ + try_info, + types::{ + BlockAcceptedResponse, BlockIdentifier, BlockProposalData, BlockPushedData, + BlockRejectReasonCode, BlockRejectedResponse, BlockResponseData, BlockValidationFailedCode, + NakamotoBlockData, NakamotoBlockHeaderData, SignerMessageMetadata, + StacksNonConsensusEventData, StacksNonConsensusEventPayloadData, StacksSignerMessage, + StacksStackerDbChunk, + }, + utils::Context, +}; +use rusqlite::Connection; + +use super::sqlite::{create_or_open_readwrite_db, open_existing_readonly_db}; + +fn get_default_signers_db_file_path(base_dir: &PathBuf) -> PathBuf { + let mut destination_path = base_dir.clone(); + destination_path.push("stacks_signers.sqlite"); + destination_path +} + +pub fn open_readonly_signers_db_conn( + base_dir: &PathBuf, + ctx: &Context, +) -> Result { + let path = get_default_signers_db_file_path(&base_dir); + let conn = open_existing_readonly_db(&path, ctx)?; + Ok(conn) +} + +pub fn initialize_signers_db(base_dir: &PathBuf, ctx: &Context) -> Result { + let conn = create_or_open_readwrite_db(Some(&get_default_signers_db_file_path(base_dir)), ctx)?; + + // Stores message headers + conn.execute( + "CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pubkey TEXT NOT NULL, + contract TEXT NOT NULL, + sig TEXT NOT NULL, + received_at_ms INTEGER NOT NULL, + received_at_block_height INTEGER NOT NULL, + received_at_index_block_hash INTEGER NOT NULL, + type TEXT NOT NULL + )", + [], + ) + .map_err(|e| format!("unable to create table: {e}"))?; + conn.execute( + "CREATE INDEX IF NOT EXISTS index_messages_on_received_at ON messages(received_at_ms, received_at_block_height)", + [] + ).map_err(|e| format!("unable to create index: {e}"))?; + conn.execute( + "CREATE INDEX IF NOT EXISTS index_messages_on_pubkey ON messages(pubkey)", + [], + ) + .map_err(|e| format!("unable to create index: {e}"))?; + + // Stores both `BlockProposal` and `BlockPushed` messages. + conn.execute( + "CREATE TABLE IF NOT EXISTS blocks ( + message_id INTEGER NOT NULL, + proposed BOOLEAN NOT NULL, + version INTEGER NOT NULL, + chain_length INTEGER NOT NULL, + burn_spent INTEGER NOT NULL, + consensus_hash TEXT NOT NULL, + parent_block_id TEXT NOT NULL, + tx_merkle_root TEXT NOT NULL, + state_index_root TEXT NOT NULL, + timestamp INTEGER NOT NULL, + miner_signature TEXT NOT NULL, + signer_signature TEXT NOT NULL, + pox_treatment TEXT NOT NULL, + block_hash TEXT NOT NULL, + index_block_hash TEXT NOT NULL, + proposal_burn_height INTEGER, + proposal_reward_cycle INTEGER, + UNIQUE(message_id), + FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE + )", + [], + ) + .map_err(|e| format!("unable to create table: {e}"))?; + + // Stores `BlockResponse` messages. + conn.execute( + "CREATE TABLE IF NOT EXISTS block_responses ( + message_id INTEGER NOT NULL, + accepted BOOLEAN NOT NULL, + signer_signature_hash TEXT NOT NULL, + signature TEXT NOT NULL, + server_version TEXT NOT NULL, + rejected_reason TEXT, + rejected_reason_code TEXT, + rejected_validation_failed_code TEXT, + rejected_chain_id INTEGER, + UNIQUE(message_id), + FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE + )", + [], + ) + .map_err(|e| format!("unable to create table: {e}"))?; + + Ok(conn) +} + +pub fn store_signer_db_messages( + base_dir: &PathBuf, + events: &Vec, + ctx: &Context, +) -> Result<(), String> { + use chainhook_sdk::types::{StacksNonConsensusEventPayloadData, StacksSignerMessage}; + + if events.len() == 0 { + return Ok(()); + } + let mut conn = + create_or_open_readwrite_db(Some(&get_default_signers_db_file_path(base_dir)), ctx)?; + let db_tx = conn + .transaction() + .map_err(|e| format!("unable to open db transaction: {e}"))?; + { + let mut message_stmt = db_tx + .prepare_cached( + "INSERT INTO messages + (pubkey, contract, sig, received_at_ms, received_at_block_height, received_at_index_block_hash, type) + VALUES (?,?,?,?,?,?,?) + RETURNING id", + ) + .map_err(|e| format!("unable to prepare statement: {e}"))?; + for event in events.iter() { + match &event.payload { + StacksNonConsensusEventPayloadData::SignerMessage(chunk) => { + // Write message header. + let type_str = match chunk.message { + StacksSignerMessage::BlockProposal(_) => "block_proposal", + StacksSignerMessage::BlockResponse(_) => "block_response", + StacksSignerMessage::BlockPushed(_) => "block_pushed", + }; + let message_id: u64 = message_stmt + .query(rusqlite::params![ + &chunk.pubkey, + &chunk.contract, + &chunk.sig, + &event.received_at_ms, + &event.received_at_block.index, + &event.received_at_block.hash, + &type_str, + ]) + .map_err(|e| format!("unable to write message: {e}"))? + .next() + .map_err(|e| format!("unable to retrieve new message id: {e}"))? + .ok_or("message id is empty")? + .get(0) + .map_err(|e| format!("unable to convert message id: {e}"))?; + + // Write payload specifics. + match &chunk.message { + StacksSignerMessage::BlockProposal(data) => { + try_info!( + ctx, + "Storing stacks BlockProposal by signer {}", + chunk.pubkey + ); + let mut stmt = db_tx + .prepare("INSERT INTO blocks + (message_id, proposed, version, chain_length, burn_spent, consensus_hash, parent_block_id, + tx_merkle_root, state_index_root, timestamp, miner_signature, signer_signature, pox_treatment, + block_hash, index_block_hash, proposal_burn_height, proposal_reward_cycle) + VALUES (?,TRUE,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + .map_err(|e| format!("unable to prepare statement: {e}"))?; + stmt.execute(rusqlite::params![ + &message_id, + &data.block.header.version, + &data.block.header.chain_length, + &data.block.header.burn_spent, + &data.block.header.consensus_hash, + &data.block.header.parent_block_id, + &data.block.header.tx_merkle_root, + &data.block.header.state_index_root, + &data.block.header.timestamp, + &data.block.header.miner_signature, + &data.block.header.signer_signature.join(","), + &data.block.header.pox_treatment, + &data.block.block_hash, + &data.block.index_block_hash, + &data.burn_height, + &data.reward_cycle, + ]) + .map_err(|e| format!("unable to write block proposal: {e}"))?; + } + StacksSignerMessage::BlockPushed(data) => { + try_info!(ctx, "Storing stacks BlockPushed by signer {}", chunk.pubkey); + let mut stmt = db_tx + .prepare("INSERT INTO blocks + (message_id, proposed, version, chain_length, burn_spent, consensus_hash, parent_block_id, + tx_merkle_root, state_index_root, timestamp, miner_signature, signer_signature, pox_treatment, + block_hash, index_block_hash) + VALUES (?,FALSE,?,?,?,?,?,?,?,?,?,?,?,?,?)") + .map_err(|e| format!("unable to prepare statement: {e}"))?; + stmt.execute(rusqlite::params![ + &message_id, + &data.block.header.version, + &data.block.header.chain_length, + &data.block.header.burn_spent, + &data.block.header.consensus_hash, + &data.block.header.parent_block_id, + &data.block.header.tx_merkle_root, + &data.block.header.state_index_root, + &data.block.header.timestamp, + &data.block.header.miner_signature, + &data.block.header.signer_signature.join(","), + &data.block.header.pox_treatment, + &data.block.block_hash, + &data.block.index_block_hash, + ]) + .map_err(|e| format!("unable to write block pushed: {e}"))?; + } + StacksSignerMessage::BlockResponse(data) => { + match data { + BlockResponseData::Accepted(response) => { + try_info!( + ctx, + "Storing stacks BlockResponse (Accepted) by signer {}", + chunk.pubkey + ); + let mut stmt = db_tx + .prepare( + "INSERT INTO block_responses + (message_id, accepted, signer_signature_hash, signature, server_version) + VALUES (?,TRUE,?,?,?)", + ) + .map_err(|e| format!("unable to prepare statement: {e}"))?; + stmt.execute(rusqlite::params![ + &message_id, + &response.signer_signature_hash, + &response.signature, + &response.metadata.server_version, + ]) + .map_err(|e| format!("unable to write block pushed: {e}"))?; + } + BlockResponseData::Rejected(response) => { + try_info!( + ctx, + "Storing stacks BlockResponse (Rejected) by signer {}", + chunk.pubkey + ); + let mut validation_code: Option<&str> = None; + let reason_code = match &response.reason_code { + BlockRejectReasonCode::ValidationFailed { + validation_failed, + } => { + validation_code = match validation_failed { + BlockValidationFailedCode::BadBlockHash => { + Some("bad_block_hash") + } + BlockValidationFailedCode::BadTransaction => { + Some("bad_transaction") + } + BlockValidationFailedCode::InvalidBlock => { + Some("invalid_block") + } + BlockValidationFailedCode::ChainstateError => { + Some("chainstate_error") + } + BlockValidationFailedCode::UnknownParent => { + Some("unknown_parent") + } + BlockValidationFailedCode::NonCanonicalTenure => { + Some("no_canonical_tenure") + } + BlockValidationFailedCode::NoSuchTenure => { + Some("no_such_tenure") + } + }; + "validation_failed" + } + BlockRejectReasonCode::ConnectivityIssues => { + "connectivity_issues" + } + BlockRejectReasonCode::RejectedInPriorRound => { + "rejected_in_prior_round" + } + BlockRejectReasonCode::NoSortitionView => { + "no_sortition_view" + } + BlockRejectReasonCode::SortitionViewMismatch => { + "sortition_view_mismatch" + } + BlockRejectReasonCode::TestingDirective => { + "testing_directive" + } + }; + let mut stmt = db_tx + .prepare("INSERT INTO block_responses + (message_id, accepted, signer_signature_hash, signature, server_version, rejected_reason, + rejected_reason_code, rejected_validation_failed_code, rejected_chain_id) + VALUES (?,FALSE,?,?,?,?,?,?,?)") + .map_err(|e| format!("unable to prepare statement: {e}"))?; + stmt.execute(rusqlite::params![ + &message_id, + &response.signer_signature_hash, + &response.signature, + &response.metadata.server_version, + &response.reason, + &reason_code, + &validation_code, + &response.chain_id, + ]) + .map_err(|e| format!("unable to write block pushed: {e}"))?; + } + }; + } + } + } + } + } + } + db_tx + .commit() + .map_err(|e| format!("unable to commit db transaction: {e}"))?; + Ok(()) +} + +fn event_data_from_message_row( + pubkey: String, + contract: String, + sig: String, + received_at_ms: u64, + received_at_block_height: u64, + received_at_index_block_hash: String, + message: StacksSignerMessage, +) -> StacksNonConsensusEventData { + StacksNonConsensusEventData { + payload: StacksNonConsensusEventPayloadData::SignerMessage(StacksStackerDbChunk { + contract, + sig, + pubkey, + message, + }), + received_at_ms, + received_at_block: BlockIdentifier { + index: received_at_block_height, + hash: received_at_index_block_hash, + }, + } +} + +pub fn get_signer_db_messages_received_at_block( + db_conn: &mut Connection, + block_identifier: &BlockIdentifier, +) -> Result, String> { + let mut events = vec![]; + let db_tx = db_conn + .transaction() + .map_err(|e| format!("unable to open db transaction: {e}"))?; + { + let mut messages_stmt = db_tx + .prepare( + "SELECT id, pubkey, contract, sig, received_at_ms, received_at_block_height, received_at_index_block_hash, + type + FROM messages + WHERE received_at_block_height = ? + ORDER BY id ASC", + ) + .map_err(|e| format!("unable to prepare query: {e}"))?; + let mut messages_iter = messages_stmt + .query(rusqlite::params![&block_identifier.index]) + .map_err(|e| format!("unable to query messages: {e}"))?; + while let Some(row) = messages_iter + .next() + .map_err(|e| format!("row error: {e}"))? + { + let message_id: u64 = row.get(0).unwrap(); + let pubkey: String = row.get(1).unwrap(); + let contract: String = row.get(2).unwrap(); + let sig: String = row.get(3).unwrap(); + let received_at_ms: u64 = row.get(4).unwrap(); + let received_at_block_height: u64 = row.get(5).unwrap(); + let received_at_index_block_hash: String = row.get(6).unwrap(); + let type_str: String = row.get(7).unwrap(); + let message = match type_str.as_str() { + "block_proposal" + | "block_pushed" => db_tx + .query_row( + "SELECT version, chain_length, burn_spent, consensus_hash, parent_block_id, tx_merkle_root, + state_index_root, timestamp, miner_signature, signer_signature, pox_treatment, block_hash, + index_block_hash, proposal_burn_height, proposal_reward_cycle + FROM blocks + WHERE message_id = ?", + rusqlite::params![&message_id], + |block_row| { + let signer_signature_str: String = block_row.get(9).unwrap(); + let header = NakamotoBlockHeaderData { + version: block_row.get(0).unwrap(), + chain_length: block_row.get(1).unwrap(), + burn_spent: block_row.get(2).unwrap(), + consensus_hash: block_row.get(3).unwrap(), + parent_block_id: block_row.get(4).unwrap(), + tx_merkle_root: block_row.get(5).unwrap(), + state_index_root: block_row.get(6).unwrap(), + timestamp: block_row.get(7).unwrap(), + miner_signature: block_row.get(8).unwrap(), + signer_signature: signer_signature_str.split(",").map(String::from).collect(), + pox_treatment: block_row.get(10).unwrap(), + }; + let block = NakamotoBlockData { + header, + block_hash: block_row.get(11).unwrap(), + index_block_hash: block_row.get(12).unwrap(), + transactions: vec![], + }; + if type_str == "block_proposal" { + Ok(StacksSignerMessage::BlockProposal(BlockProposalData { + block, + burn_height: block_row.get(13).unwrap(), + reward_cycle: block_row.get(14).unwrap(), + })) + } else { + Ok(StacksSignerMessage::BlockPushed(BlockPushedData { block })) + } + }, + ) + .map_err(|e| format!("unable to query block proposal: {e}"))?, + "block_response" => db_tx + .query_row( + "SELECT accepted, signer_signature_hash, signature, server_version, rejected_reason, + rejected_reason_code, rejected_validation_failed_code, rejected_chain_id + FROM block_responses + WHERE message_id = ?", + rusqlite::params![&message_id], + |response_row| { + let accepted: bool = response_row.get(0).unwrap(); + let signer_signature_hash: String = response_row.get(1).unwrap(); + let signature: String = response_row.get(2).unwrap(); + let metadata = SignerMessageMetadata { + server_version: response_row.get(3).unwrap() + }; + if accepted { + Ok(StacksSignerMessage::BlockResponse(BlockResponseData::Accepted(BlockAcceptedResponse { + signer_signature_hash, + signature, + metadata, + }))) + } else { + let rejected_reason_code: String = response_row.get(5).unwrap(); + Ok(StacksSignerMessage::BlockResponse(BlockResponseData::Rejected(BlockRejectedResponse { + signer_signature_hash, + signature, + metadata, + reason: response_row.get(4).unwrap(), + reason_code: match rejected_reason_code.as_str() { + "validation_failed" => { + let validation_code: String = response_row.get(6).unwrap(); + BlockRejectReasonCode::ValidationFailed { + validation_failed: match validation_code.as_str() { + "bad_block_hash" => BlockValidationFailedCode::BadBlockHash, + "bad_transaction" => BlockValidationFailedCode::BadTransaction, + "invalid_block" => BlockValidationFailedCode::InvalidBlock, + "chainstate_error" => BlockValidationFailedCode::ChainstateError, + "unknown_parent" => BlockValidationFailedCode::UnknownParent, + "no_canonical_tenure" => BlockValidationFailedCode::NonCanonicalTenure, + "no_such_tenure" => BlockValidationFailedCode::NoSuchTenure, + _ => unreachable!(), + } + } + }, + "connectivity_issues" => BlockRejectReasonCode::ConnectivityIssues, + "rejected_in_prior_round" => BlockRejectReasonCode::RejectedInPriorRound, + "no_sortition_view" => BlockRejectReasonCode::NoSortitionView, + "sortition_view_mismatch" => BlockRejectReasonCode::SortitionViewMismatch, + "testing_directive" => BlockRejectReasonCode::TestingDirective, + _ => unreachable!(), + }, + chain_id: response_row.get(7).unwrap(), + }))) + } + }, + ) + .map_err(|e| format!("unable to query block response: {e}"))?, + _ => return Err(format!("invalid message type: {type_str}")), + }; + events.push(event_data_from_message_row( + pubkey, + contract, + sig, + received_at_ms, + received_at_block_height, + received_at_index_block_hash, + message, + )); + } + } + Ok(events) +} diff --git a/components/chainhook-cli/src/storage/sqlite.rs b/components/chainhook-cli/src/storage/sqlite.rs new file mode 100644 index 000000000..a31df002e --- /dev/null +++ b/components/chainhook-cli/src/storage/sqlite.rs @@ -0,0 +1,87 @@ +use std::path::PathBuf; + +use chainhook_sdk::{try_error, utils::Context}; +use rusqlite::{Connection, OpenFlags}; + +/// Configures the SQLite connection with common settings. +fn connection_with_defaults_pragma(conn: Connection) -> Result { + conn.busy_timeout(std::time::Duration::from_secs(300)) + .map_err(|e| format!("unable to set db timeout: {e}"))?; + conn.pragma_update(None, "mmap_size", 512 * 1024 * 1024) + .map_err(|e| format!("unable to set db mmap_size: {e}"))?; + conn.pragma_update(None, "cache_size", 512 * 1024 * 1024) + .map_err(|e| format!("unable to set db cache_size: {e}"))?; + conn.pragma_update(None, "journal_mode", &"WAL") + .map_err(|e| format!("unable to enable db wal: {e}"))?; + Ok(conn) +} + +pub fn open_existing_readonly_db(db_path: &PathBuf, ctx: &Context) -> Result { + let open_flags = match std::fs::metadata(db_path) { + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + return Err(format!("could not find {}", db_path.display())); + } else { + return Err(format!("could not stat {}", db_path.display())); + } + } + Ok(_md) => { + OpenFlags::SQLITE_OPEN_READ_ONLY + } + }; + let conn = loop { + match Connection::open_with_flags(db_path, open_flags) { + Ok(conn) => break conn, + Err(e) => { + try_error!(ctx, "unable to open hord.rocksdb: {}", e.to_string()); + } + }; + std::thread::sleep(std::time::Duration::from_secs(1)); + }; + Ok(connection_with_defaults_pragma(conn)?) +} + +pub fn create_or_open_readwrite_db( + db_path: Option<&PathBuf>, + ctx: &Context, +) -> Result { + let open_flags = if let Some(db_path) = db_path { + match std::fs::metadata(&db_path) { + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + // Create the directory path that leads to the DB file + if let Some(dirp) = PathBuf::from(&db_path).parent() { + std::fs::create_dir_all(dirp) + .map_err(|e| format!("unable to create db directory path: {e}"))?; + } + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE + } else { + return Err(format!( + "could not stat db directory {}: {e}", + db_path.display() + )); + } + } + Ok(_) => OpenFlags::SQLITE_OPEN_READ_WRITE, + } + } else { + OpenFlags::SQLITE_OPEN_READ_WRITE + }; + + let path = match db_path { + Some(path) => path.to_str().unwrap(), + None => ":memory:", + }; + let conn = loop { + // Connect with retry. + match Connection::open_with_flags(&path, open_flags) { + Ok(conn) => break conn, + Err(e) => { + try_error!(ctx, "unable to open sqlite db: {e}"); + } + }; + std::thread::sleep(std::time::Duration::from_secs(1)); + }; + + Ok(connection_with_defaults_pragma(conn)?) +} diff --git a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs index a59344ea4..36995f1a4 100644 --- a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs @@ -873,10 +873,7 @@ pub fn evaluate_stacks_predicate_on_non_consensus_events<'a>( | StacksPredicate::NftEvent(_) | StacksPredicate::StxEvent(_) | StacksPredicate::PrintEvent(_) - | StacksPredicate::Txid(_) => { - // Ignore, possibly expected behavior? - // https://github.com/hirosystems/chainhook/pull/663#discussion_r1814995429 - }, + | StacksPredicate::Txid(_) => {}, }; } (occurrences, expired_predicates) diff --git a/components/chainhook-sdk/src/indexer/stacks/mod.rs b/components/chainhook-sdk/src/indexer/stacks/mod.rs index 7ebf90c16..9cc5b8928 100644 --- a/components/chainhook-sdk/src/indexer/stacks/mod.rs +++ b/components/chainhook-sdk/src/indexer/stacks/mod.rs @@ -478,9 +478,7 @@ pub fn standardize_stacks_block( let signer_sig_hash = block .signer_signature_hash .as_ref() - .map(|hash| { - hex::decode(&hash[2..]).expect("unable to decode signer_signature hex") - }); + .map(|hash| hex::decode(&hash[2..]).expect("unable to decode signer_signature hex")); let block = StacksBlockData { block_identifier: BlockIdentifier { @@ -513,16 +511,24 @@ pub fn standardize_stacks_block( signer_signature: block.signer_signature.clone(), signer_public_keys: match (signer_sig_hash, &block.signer_signature) { - (Some(signer_sig_hash), Some(signatures)) => { - Some(signatures.iter().map(|sig_hex| { - let sig_msg = clarity::util::secp256k1::MessageSignature::from_hex(sig_hex) - .map_err(|e| format!("unable to parse signer signature message: {}", e))?; - let pubkey = get_signer_pubkey_from_message_hash(&signer_sig_hash, &sig_msg) - .map_err(|e| format!("unable to recover signer sig pubkey: {}", e))?; - Ok(format!("0x{}", hex::encode(pubkey))) - }) - .collect::, String>>()?) - } + (Some(signer_sig_hash), Some(signatures)) => Some( + signatures + .iter() + .map(|sig_hex| { + let sig_msg = + clarity::util::secp256k1::MessageSignature::from_hex(sig_hex) + .map_err(|e| { + format!("unable to parse signer signature message: {}", e) + })?; + let pubkey = + get_signer_pubkey_from_message_hash(&signer_sig_hash, &sig_msg) + .map_err(|e| { + format!("unable to recover signer sig pubkey: {}", e) + })?; + Ok(format!("0x{}", hex::encode(pubkey))) + }) + .collect::, String>>()?, + ), _ => None, }, @@ -676,35 +682,33 @@ pub fn standardize_stacks_microblock_trail( #[cfg(feature = "stacks-signers")] pub fn standardize_stacks_marshalled_stackerdb_chunks( marshalled_stackerdb_chunks: JsonValue, - _ctx: &Context, + ctx: &Context, ) -> Result, String> { let mut stackerdb_chunks: NewStackerDbChunks = serde_json::from_value(marshalled_stackerdb_chunks) .map_err(|e| format!("unable to parse stackerdb chunks {e}"))?; - standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks) + standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks, ctx) } #[cfg(feature = "stacks-signers")] pub fn standardize_stacks_stackerdb_chunks( stackerdb_chunks: &NewStackerDbChunks, + ctx: &Context, ) -> Result, String> { use stacks_codec::codec::BlockResponse; use stacks_codec::codec::RejectCode; use stacks_codec::codec::SignerMessage; use stacks_codec::codec::ValidateRejectCode; + use crate::try_debug; + let contract_id = &stackerdb_chunks.contract_id.name; let mut parsed_chunks: Vec = vec![]; for slot in stackerdb_chunks.modified_slots.iter() { - let data_bytes = match hex::decode(&slot.data) { - Ok(bytes) => bytes, - Err(e) => return Err(format!("unable to decode signer slot hex data: {e}")), - }; - let signer_message = - match SignerMessage::consensus_deserialize(&mut Cursor::new(&data_bytes)) { - Ok(message) => message, - Err(e) => return Err(format!("unable to deserialize SignerMessage: {e}")), - }; + let data_bytes = hex::decode(&slot.data) + .map_err(|e| format!("unable to decode signer slot hex data: {e}"))?; + let signer_message = SignerMessage::consensus_deserialize(&mut Cursor::new(&data_bytes)) + .map_err(|e| format!("unable to deserialize SignerMessage: {e}"))?; let message = match signer_message { SignerMessage::BlockProposal(block_proposal) => { StacksSignerMessage::BlockProposal(BlockProposalData { @@ -716,11 +720,14 @@ pub fn standardize_stacks_stackerdb_chunks( SignerMessage::BlockResponse(block_response) => match block_response { BlockResponse::Accepted(block_accepted) => StacksSignerMessage::BlockResponse( BlockResponseData::Accepted(BlockAcceptedResponse { - signer_signature_hash: format!("0x{}", block_accepted.signer_signature_hash.to_hex()), + signer_signature_hash: format!( + "0x{}", + block_accepted.signer_signature_hash.to_hex() + ), signature: format!("0x{}", block_accepted.signature.to_hex()), metadata: SignerMessageMetadata { server_version: block_accepted.metadata.server_version, - } + }, }), ), BlockResponse::Rejected(block_rejection) => StacksSignerMessage::BlockResponse( @@ -783,9 +790,16 @@ pub fn standardize_stacks_stackerdb_chunks( block: standardize_stacks_nakamoto_block(&nakamoto_block)?, }) } - SignerMessage::MockSignature(_) - | SignerMessage::MockProposal(_) - | SignerMessage::MockBlock(_) => { + SignerMessage::MockSignature(_) => { + try_debug!(ctx, "Ignoring MockSignature stacks signer message"); + continue; + } + SignerMessage::MockProposal(_) => { + try_debug!(ctx, "Ignoring MockProposal stacks signer message"); + continue; + } + SignerMessage::MockBlock(_) => { + try_debug!(ctx, "Ignoring MockBlock stacks signer message"); continue; } }; @@ -898,8 +912,8 @@ pub fn get_signer_pubkey_from_message_hash( RecoveryId::from_i32(rec_id as i32).map_err(|e| format!("invalid recovery id: {e}"))?; let signature = RecoverableSignature::from_compact(&sig, recovery_id) .map_err(|e| format!("invalid signature: {e}"))?; - let message = - Message::from_digest_slice(&message_hash).map_err(|e| format!("invalid digest message: {e}"))?; + let message = Message::from_digest_slice(&message_hash) + .map_err(|e| format!("invalid digest message: {e}"))?; let pubkey = secp .recover_ecdsa(&message, &signature) diff --git a/components/chainhook-sdk/src/indexer/stacks/tests.rs b/components/chainhook-sdk/src/indexer/stacks/tests.rs index f4a005c97..bf775d59b 100644 --- a/components/chainhook-sdk/src/indexer/stacks/tests.rs +++ b/components/chainhook-sdk/src/indexer/stacks/tests.rs @@ -404,9 +404,12 @@ fn into_chainhook_event_rejects_invalid_missing_event() { fn parses_block_response_signer_message() { use chainhook_types::{BlockResponseData, StacksSignerMessage}; - use crate::indexer::stacks::{ - NewSignerModifiedSlot, NewStackerDbChunkIssuerId, NewStackerDbChunkIssuerSlots, - NewStackerDbChunks, NewStackerDbChunksContractId, + use crate::{ + indexer::stacks::{ + NewSignerModifiedSlot, NewStackerDbChunkIssuerId, NewStackerDbChunkIssuerSlots, + NewStackerDbChunks, NewStackerDbChunksContractId, + }, + utils::Context, }; use super::standardize_stacks_stackerdb_chunks; @@ -426,7 +429,8 @@ fn parses_block_response_signer_message() { slot_version: 11, }], }; - let parsed_chunk = standardize_stacks_stackerdb_chunks(&new_chunks).unwrap(); + let ctx = &Context::empty(); + let parsed_chunk = standardize_stacks_stackerdb_chunks(&new_chunks, ctx).unwrap(); assert_eq!(parsed_chunk.len(), 1); let message = &parsed_chunk[0]; diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 08f8248af..1abe47ae7 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -959,11 +959,7 @@ pub async fn start_stacks_event_observer( indexer.seed_stacks_block_pool(stacks_startup_context.block_pool_seed, &ctx); let log_level = if config.display_stacks_ingestion_logs { - if cfg!(feature = "cli") { - LogLevel::Critical - } else { - LogLevel::Debug - } + LogLevel::Debug } else { LogLevel::Off };