diff --git a/CHANGELOG.md b/CHANGELOG.md index 788b7bf45..f1ab0498e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ ### Changes +* [BREAKING] Refactored the sync process to use a new `SyncState` component (#650). * [BREAKING] Return `None` instead of `Err` when an entity is not found (#632). * Add support for notes without assets in transaction requests (#654). * Refactored RPC functions and structs to improve code quality (#616). diff --git a/crates/rust-client/src/lib.rs b/crates/rust-client/src/lib.rs index 1001b4b08..657fd4471 100644 --- a/crates/rust-client/src/lib.rs +++ b/crates/rust-client/src/lib.rs @@ -58,6 +58,7 @@ //! crypto::RpoRandomCoin, //! rpc::{Endpoint, TonicRpcClient}, //! store::{sqlite_store::SqliteStore, Store}, +//! sync::{on_note_received, StateSync}, //! Client, Felt, //! }; //! use miden_objects::crypto::rand::FeltRng; diff --git a/crates/rust-client/src/note/import.rs b/crates/rust-client/src/note/import.rs index 3e6fb824c..56bc4739a 100644 --- a/crates/rust-client/src/note/import.rs +++ b/crates/rust-client/src/note/import.rs @@ -174,7 +174,7 @@ impl Client { note_record.inclusion_proof_received(inclusion_proof, metadata)?; if block_height < current_block_num { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.build_current_partial_mmr().await?; let block_header = self .get_and_store_authenticated_block(block_height, &mut current_partial_mmr) @@ -218,7 +218,7 @@ impl Client { match committed_note_data { Some((metadata, inclusion_proof)) => { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.build_current_partial_mmr().await?; let block_header = self .get_and_store_authenticated_block( inclusion_proof.location().block_num(), diff --git a/crates/rust-client/src/note/mod.rs b/crates/rust-client/src/note/mod.rs index a1ab94a03..7154e0cf7 100644 --- a/crates/rust-client/src/note/mod.rs +++ b/crates/rust-client/src/note/mod.rs @@ -56,11 +56,7 @@ //! For more details on the API and error handling, see the documentation for the specific functions //! and types in this module. -use alloc::{ - collections::{BTreeMap, BTreeSet}, - string::ToString, - vec::Vec, -}; +use alloc::{string::ToString, vec::Vec}; use miden_lib::transaction::TransactionKernel; use miden_objects::{account::AccountId, crypto::rand::FeltRng}; @@ -74,6 +70,7 @@ pub mod script_roots; mod import; mod note_screener; +mod note_updates; // RE-EXPORTS // ================================================================================================ @@ -92,6 +89,7 @@ pub use miden_objects::{ NoteError, }; pub use note_screener::{NoteConsumability, NoteRelevance, NoteScreener, NoteScreenerError}; +pub use note_updates::NoteUpdates; /// Contains functions to simplify standard note scripts creation. pub mod scripts { @@ -239,95 +237,3 @@ pub async fn get_input_note_with_id_prefix( .pop() .expect("input_note_records should always have one element")) } - -// NOTE UPDATES -// ------------------------------------------------------------------------------------------------ - -/// Contains note changes to apply to the store. -#[derive(Clone, Debug, Default)] -pub struct NoteUpdates { - /// A map of new and updated input note records to be upserted in the store. - updated_input_notes: BTreeMap, - /// A map of updated output note records to be upserted in the store. - updated_output_notes: BTreeMap, -} - -impl NoteUpdates { - /// Creates a [`NoteUpdates`]. - pub fn new( - updated_input_notes: impl IntoIterator, - updated_output_notes: impl IntoIterator, - ) -> Self { - Self { - updated_input_notes: updated_input_notes - .into_iter() - .map(|note| (note.id(), note)) - .collect(), - updated_output_notes: updated_output_notes - .into_iter() - .map(|note| (note.id(), note)) - .collect(), - } - } - - /// Returns all input note records that have been updated. - /// This may include: - /// - New notes that have been created that should be inserted. - /// - Existing tracked notes that should be updated. - pub fn updated_input_notes(&self) -> impl Iterator { - self.updated_input_notes.values() - } - - /// Returns all output note records that have been updated. - /// This may include: - /// - New notes that have been created that should be inserted. - /// - Existing tracked notes that should be updated. - pub fn updated_output_notes(&self) -> impl Iterator { - self.updated_output_notes.values() - } - - /// Returns whether no new note-related information has been retrieved. - pub fn is_empty(&self) -> bool { - self.updated_input_notes.is_empty() && self.updated_output_notes.is_empty() - } - - /// Returns any note that has been committed into the chain in this update (either new or - /// already locally tracked) - pub fn committed_input_notes(&self) -> impl Iterator { - self.updated_input_notes.values().filter(|note| note.is_committed()) - } - - /// Returns the IDs of all notes that have been committed in this update. - /// This includes both new notes and tracked expected notes that were committed in this update. - pub fn committed_note_ids(&self) -> BTreeSet { - let committed_output_note_ids = self - .updated_output_notes - .values() - .filter_map(|note_record| note_record.is_committed().then_some(note_record.id())); - - let committed_input_note_ids = self - .updated_input_notes - .values() - .filter_map(|note_record| note_record.is_committed().then_some(note_record.id())); - - committed_input_note_ids - .chain(committed_output_note_ids) - .collect::>() - } - - /// Returns the IDs of all notes that have been consumed. - /// This includes both notes that have been consumed locally or externally in this update. - pub fn consumed_note_ids(&self) -> BTreeSet { - let consumed_output_note_ids = self - .updated_output_notes - .values() - .filter_map(|note_record| note_record.is_consumed().then_some(note_record.id())); - - let consumed_input_note_ids = self - .updated_input_notes - .values() - .filter_map(|note_record| note_record.is_consumed().then_some(note_record.id())); - - consumed_input_note_ids.chain(consumed_output_note_ids).collect::>() - } -} diff --git a/crates/rust-client/src/note/note_screener.rs b/crates/rust-client/src/note/note_screener.rs index b4dc9dae7..8a444cdd7 100644 --- a/crates/rust-client/src/note/note_screener.rs +++ b/crates/rust-client/src/note/note_screener.rs @@ -57,6 +57,8 @@ impl NoteScreener { /// Does a fast check for known scripts (P2ID, P2IDR, SWAP). We're currently /// unable to execute notes that aren't committed so a slow check for other scripts is /// currently not available. + /// + /// If relevance can't be determined, the screener defaults to setting the note as consumable. pub async fn check_relevance( &self, note: &Note, diff --git a/crates/rust-client/src/note/note_updates.rs b/crates/rust-client/src/note/note_updates.rs new file mode 100644 index 000000000..867e543f6 --- /dev/null +++ b/crates/rust-client/src/note/note_updates.rs @@ -0,0 +1,247 @@ +use alloc::collections::{BTreeMap, BTreeSet}; + +use miden_objects::{ + block::BlockHeader, + note::{NoteId, NoteInclusionProof, Nullifier}, + transaction::TransactionId, +}; + +use crate::{ + rpc::domain::{ + note::CommittedNote, nullifier::NullifierUpdate, transaction::TransactionUpdate, + }, + store::{InputNoteRecord, OutputNoteRecord}, + sync::NoteTagRecord, + ClientError, +}; + +// NOTE UPDATES +// ================================================================================================ + +/// Contains note changes to apply to the store. +#[derive(Clone, Debug, Default)] +pub struct NoteUpdates { + /// A map of new and updated input note records to be upserted in the store. + updated_input_notes: BTreeMap, + /// A map of updated output note records to be upserted in the store. + updated_output_notes: BTreeMap, +} + +impl NoteUpdates { + /// Creates a [`NoteUpdates`]. + pub fn new( + updated_input_notes: impl IntoIterator, + updated_output_notes: impl IntoIterator, + ) -> Self { + Self { + updated_input_notes: updated_input_notes + .into_iter() + .map(|note| (note.id(), note)) + .collect(), + updated_output_notes: updated_output_notes + .into_iter() + .map(|note| (note.id(), note)) + .collect(), + } + } + + // GETTERS + // -------------------------------------------------------------------------------------------- + + /// Returns all input note records that have been updated. + /// This may include: + /// - New notes that have been created that should be inserted. + /// - Existing tracked notes that should be updated. + pub fn updated_input_notes(&self) -> impl Iterator { + self.updated_input_notes.values() + } + + /// Returns all output note records that have been updated. + /// This may include: + /// - New notes that have been created that should be inserted. + /// - Existing tracked notes that should be updated. + pub fn updated_output_notes(&self) -> impl Iterator { + self.updated_output_notes.values() + } + + /// Returns whether no new note-related information has been retrieved. + pub fn is_empty(&self) -> bool { + self.updated_input_notes.is_empty() && self.updated_output_notes.is_empty() + } + + /// Returns any note that has been committed into the chain in this update (either new or + /// already locally tracked) + pub fn committed_input_notes(&self) -> impl Iterator { + self.updated_input_notes.values().filter(|note| note.is_committed()) + } + + /// Returns the tags of all notes that need to be removed from the store after the state sync. + /// These are the tags of notes that have been committed and no longer need to be tracked. + pub fn tags_to_remove(&self) -> impl Iterator + '_ { + self.committed_input_notes().map(|note| { + NoteTagRecord::with_note_source( + note.metadata().expect("Committed notes should have metadata").tag(), + note.id(), + ) + }) + } + + /// Returns the IDs of all notes that have been committed in this update. + /// This includes both new notes and tracked expected notes that were committed in this update. + pub fn committed_note_ids(&self) -> BTreeSet { + let committed_output_note_ids = self + .updated_output_notes + .values() + .filter_map(|note_record| note_record.is_committed().then_some(note_record.id())); + + let committed_input_note_ids = self + .updated_input_notes + .values() + .filter_map(|note_record| note_record.is_committed().then_some(note_record.id())); + + committed_input_note_ids + .chain(committed_output_note_ids) + .collect::>() + } + + /// Returns the IDs of all notes that have been consumed. + /// This includes both notes that have been consumed locally or externally in this update. + pub fn consumed_note_ids(&self) -> BTreeSet { + let consumed_output_note_ids = self + .updated_output_notes + .values() + .filter_map(|note_record| note_record.is_consumed().then_some(note_record.id())); + + let consumed_input_note_ids = self + .updated_input_notes + .values() + .filter_map(|note_record| note_record.is_consumed().then_some(note_record.id())); + + consumed_input_note_ids.chain(consumed_output_note_ids).collect::>() + } + + // UPDATE METHODS + // -------------------------------------------------------------------------------------------- + + /// Inserts new or updated input and output notes. If an update with the same note ID already + /// exists, it will be replaced. + pub(crate) fn insert_updates( + &mut self, + input_note: Option, + output_note: Option, + ) { + if let Some(input_note) = input_note { + self.updated_input_notes.insert(input_note.id(), input_note); + } + if let Some(output_note) = output_note { + self.updated_output_notes.insert(output_note.id(), output_note); + } + } + + /// Applies the necessary state transitions to the [`NoteUpdates`] when a note is committed in a + /// block. + pub(crate) fn apply_committed_note_state_transitions( + &mut self, + committed_note: &CommittedNote, + block_header: &BlockHeader, + ) -> Result<(), ClientError> { + let inclusion_proof = NoteInclusionProof::new( + block_header.block_num(), + committed_note.note_index(), + committed_note.merkle_path().clone(), + )?; + + if let Some(input_note_record) = self.get_input_note_by_id(*committed_note.note_id()) { + // The note belongs to our locally tracked set of input notes + input_note_record + .inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?; + input_note_record.block_header_received(block_header)?; + } + + if let Some(output_note_record) = self.get_output_note_by_id(*committed_note.note_id()) { + // The note belongs to our locally tracked set of output notes + output_note_record.inclusion_proof_received(inclusion_proof.clone())?; + } + + Ok(()) + } + + /// Applies the necessary state transitions to the [`NoteUpdates`] when a note is nullified in a + /// block. For input note records two possible scenarios are considered: + /// 1. The note was being processed by a local transaction that just got committed. + /// 2. The note was consumed by an external transaction. If a local transaction was processing + /// the note and it didn't get committed, the transaction should be discarded. + pub(crate) fn apply_nullifiers_state_transitions( + &mut self, + nullifier_update: &NullifierUpdate, + transaction_updates: &[TransactionUpdate], + ) -> Result, ClientError> { + let mut discarded_transaction = None; + + if let Some(input_note_record) = + self.get_input_note_by_nullifier(nullifier_update.nullifier) + { + if let Some(consumer_transaction) = transaction_updates + .iter() + .find(|t| input_note_record.consumer_transaction_id() == Some(&t.transaction_id)) + { + // The note was being processed by a local transaction that just got committed + input_note_record.transaction_committed( + consumer_transaction.transaction_id, + consumer_transaction.block_num, + )?; + } else { + // The note was consumed by an external transaction + if let Some(id) = input_note_record.consumer_transaction_id() { + // The note was being processed by a local transaction that didn't end up being + // committed so it should be discarded + discarded_transaction.replace(*id); + } + input_note_record + .consumed_externally(nullifier_update.nullifier, nullifier_update.block_num)?; + } + } + + if let Some(output_note_record) = + self.get_output_note_by_nullifier(nullifier_update.nullifier) + { + output_note_record + .nullifier_received(nullifier_update.nullifier, nullifier_update.block_num)?; + } + + Ok(discarded_transaction) + } + + // PRIVATE HELPERS + // -------------------------------------------------------------------------------------------- + + /// Returns a mutable reference to the input note record with the provided ID if it exists. + fn get_input_note_by_id(&mut self, note_id: NoteId) -> Option<&mut InputNoteRecord> { + self.updated_input_notes.get_mut(¬e_id) + } + + /// Returns a mutable reference to the output note record with the provided ID if it exists. + fn get_output_note_by_id(&mut self, note_id: NoteId) -> Option<&mut OutputNoteRecord> { + self.updated_output_notes.get_mut(¬e_id) + } + + /// Returns a mutable reference to the input note record with the provided nullifier if it + /// exists. + fn get_input_note_by_nullifier( + &mut self, + nullifier: Nullifier, + ) -> Option<&mut InputNoteRecord> { + self.updated_input_notes.values_mut().find(|note| note.nullifier() == nullifier) + } + + /// Returns a mutable reference to the output note record with the provided nullifier if it + /// exists. + fn get_output_note_by_nullifier( + &mut self, + nullifier: Nullifier, + ) -> Option<&mut OutputNoteRecord> { + self.updated_output_notes + .values_mut() + .find(|note| note.nullifier() == Some(nullifier)) + } +} diff --git a/crates/rust-client/src/rpc/domain/nullifier.rs b/crates/rust-client/src/rpc/domain/nullifier.rs index 3b0dab6e8..0c5e61779 100644 --- a/crates/rust-client/src/rpc/domain/nullifier.rs +++ b/crates/rust-client/src/rpc/domain/nullifier.rs @@ -7,6 +7,7 @@ use crate::rpc::{self, errors::RpcConversionError, generated::digest::Digest}; // ================================================================================================ /// Represents a note that was consumed in the node at a certain block. +#[derive(Debug, Clone)] pub struct NullifierUpdate { /// The nullifier of the consumed note. pub nullifier: Nullifier, diff --git a/crates/rust-client/src/rpc/domain/transaction.rs b/crates/rust-client/src/rpc/domain/transaction.rs index 0298f04d9..288863b48 100644 --- a/crates/rust-client/src/rpc/domain/transaction.rs +++ b/crates/rust-client/src/rpc/domain/transaction.rs @@ -35,6 +35,7 @@ impl TryFrom for TransactionId { // ================================================================================================ /// Represents a transaction that was included in the node at a certain block. +#[derive(Debug, Clone)] pub struct TransactionUpdate { /// The transaction identifier. pub transaction_id: TransactionId, diff --git a/crates/rust-client/src/store/mod.rs b/crates/rust-client/src/store/mod.rs index 484b6f37e..c7d46ee9a 100644 --- a/crates/rust-client/src/store/mod.rs +++ b/crates/rust-client/src/store/mod.rs @@ -33,12 +33,10 @@ use miden_objects::{ block::{BlockHeader, BlockNumber}, crypto::merkle::{InOrderIndex, MmrPeaks}, note::{NoteId, NoteTag, Nullifier}, - transaction::TransactionId, Digest, Word, }; use crate::{ - note::NoteUpdates, sync::{NoteTagRecord, StateSyncUpdate}, transaction::{TransactionRecord, TransactionStoreUpdate}, }; @@ -308,19 +306,11 @@ pub trait Store: Send + Sync { /// - Updating the corresponding tracked input/output notes. /// - Removing note tags that are no longer relevant. /// - Updating transactions in the store, marking as `committed` or `discarded`. + /// - In turn, validating private account's state transitions. If a private account's hash + /// locally does not match the `StateSyncUpdate` information, the account may be locked. /// - Storing new MMR authentication nodes. - /// - Updating the tracked on-chain accounts. + /// - Updating the tracked public accounts. async fn apply_state_sync(&self, state_sync_update: StateSyncUpdate) -> Result<(), StoreError>; - - /// Applies nullifier updates to database. - /// Nullifiers are retrieved after completing a `StateSync`. - /// - /// This operation is temporary, to be removed as part of miden-client/650. - async fn apply_nullifiers( - &self, - note_updates: NoteUpdates, - transactions_to_discard: Vec, - ) -> Result<(), StoreError>; } // CHAIN MMR NODE FILTER diff --git a/crates/rust-client/src/store/sqlite_store/mod.rs b/crates/rust-client/src/store/sqlite_store/mod.rs index 1f94b3835..da555e80a 100644 --- a/crates/rust-client/src/store/sqlite_store/mod.rs +++ b/crates/rust-client/src/store/sqlite_store/mod.rs @@ -17,7 +17,6 @@ use miden_objects::{ block::{BlockHeader, BlockNumber}, crypto::merkle::{InOrderIndex, MmrPeaks}, note::{NoteTag, Nullifier}, - transaction::TransactionId, Digest, Word, }; use rusqlite::{types::Value, vtab::array, Connection}; @@ -28,7 +27,6 @@ use super::{ OutputNoteRecord, Store, TransactionFilter, }; use crate::{ - note::NoteUpdates, store::StoreError, sync::{NoteTagRecord, StateSyncUpdate}, transaction::{TransactionRecord, TransactionStoreUpdate}, @@ -329,17 +327,6 @@ impl Store for SqliteStore { self.interact_with_connection(SqliteStore::get_unspent_input_note_nullifiers) .await } - - async fn apply_nullifiers( - &self, - note_updates: NoteUpdates, - transactions_to_discard: Vec, - ) -> Result<(), StoreError> { - self.interact_with_connection(move |conn| { - SqliteStore::apply_nullifiers(conn, ¬e_updates, &transactions_to_discard) - }) - .await - } } // UTILS diff --git a/crates/rust-client/src/store/sqlite_store/sync.rs b/crates/rust-client/src/store/sqlite_store/sync.rs index f9129933b..cc1f5c674 100644 --- a/crates/rust-client/src/store/sqlite_store/sync.rs +++ b/crates/rust-client/src/store/sqlite_store/sync.rs @@ -2,13 +2,12 @@ use alloc::{collections::BTreeSet, vec::Vec}; -use miden_objects::{block::BlockNumber, note::NoteTag, transaction::TransactionId}; +use miden_objects::{block::BlockNumber, note::NoteTag}; use miden_tx::utils::{Deserializable, Serializable}; use rusqlite::{params, Connection, Transaction}; use super::SqliteStore; use crate::{ - note::NoteUpdates, store::{ sqlite_store::{ account::{lock_account, update_account}, @@ -103,35 +102,55 @@ impl SqliteStore { state_sync_update: StateSyncUpdate, ) -> Result<(), StoreError> { let StateSyncUpdate { - block_header, - block_has_relevant_notes, - new_mmr_peaks, - new_authentication_nodes, + block_num, + block_updates, note_updates, transaction_updates, account_updates, - tags_to_remove, } = state_sync_update; + let mut locked_accounts = vec![]; + + for (account_id, digest) in account_updates.mismatched_private_accounts() { + // Mismatched digests may be due to stale network data. If the mismatched digest is + // tracked in the db and corresponds to the mismatched account, it means we + // got a past update and shouldn't lock the account. + if let Some(account) = Self::get_account_header_by_hash(conn, *digest)? { + if account.id() == *account_id { + continue; + } + } + + locked_accounts.push(*account_id); + } + let tx = conn.transaction()?; // Update state sync block number const BLOCK_NUMBER_QUERY: &str = "UPDATE state_sync SET block_num = ?"; - tx.execute(BLOCK_NUMBER_QUERY, params![i64::from(block_header.block_num().as_u32())])?; + tx.execute(BLOCK_NUMBER_QUERY, params![i64::from(block_num.as_u32())])?; + + for (block_header, block_has_relevant_notes, new_mmr_peaks) in block_updates.block_headers() + { + Self::insert_block_header_tx( + &tx, + block_header, + new_mmr_peaks, + *block_has_relevant_notes, + )?; + } - Self::insert_block_header_tx(&tx, &block_header, &new_mmr_peaks, block_has_relevant_notes)?; + // Insert new authentication nodes (inner nodes of the PartialMmr) + Self::insert_chain_mmr_nodes_tx(&tx, block_updates.new_authentication_nodes())?; // Update notes apply_note_updates_tx(&tx, ¬e_updates)?; // Remove tags - for tag in tags_to_remove { + for tag in note_updates.tags_to_remove() { remove_note_tag_tx(&tx, tag)?; } - // Insert new authentication nodes (inner nodes of the PartialMmr) - Self::insert_chain_mmr_nodes_tx(&tx, &new_authentication_nodes)?; - // Mark transactions as committed Self::mark_transactions_as_committed(&tx, transaction_updates.committed_transactions())?; @@ -143,8 +162,8 @@ impl SqliteStore { update_account(&tx, account)?; } - for (account_id, _) in account_updates.mismatched_private_accounts() { - lock_account(&tx, *account_id)?; + for account_id in locked_accounts { + lock_account(&tx, account_id)?; } // Commit the updates @@ -152,22 +171,6 @@ impl SqliteStore { Ok(()) } - - pub(super) fn apply_nullifiers( - conn: &mut Connection, - note_updates: &NoteUpdates, - transactions_to_discard: &[TransactionId], - ) -> Result<(), StoreError> { - let tx = conn.transaction()?; - - apply_note_updates_tx(&tx, note_updates)?; - - Self::mark_transactions_as_discarded(&tx, transactions_to_discard)?; - - tx.commit()?; - - Ok(()) - } } pub(super) fn add_note_tag_tx(tx: &Transaction<'_>, tag: &NoteTagRecord) -> Result<(), StoreError> { diff --git a/crates/rust-client/src/store/web_store/js/sync.js b/crates/rust-client/src/store/web_store/js/sync.js index 563a75c20..3a7cf614e 100644 --- a/crates/rust-client/src/store/web_store/js/sync.js +++ b/crates/rust-client/src/store/web_store/js/sync.js @@ -80,8 +80,9 @@ export async function removeNoteTag(tag, sourceNoteId, sourceAccountId) { export async function applyStateSync( blockNum, - blockHeader, - chainMmrPeaks, + newBlockHeadersAsFlattenedVec, + newBlockNums, + chainMmrPeaksAsFlattenedVec, hasClientNotes, nodeIndexes, nodes, @@ -89,6 +90,11 @@ export async function applyStateSync( committedTransactionIds, transactionBlockNums ) { + const newBlockHeaders = reconstructFlattenedVec( + newBlockHeadersAsFlattenedVec + ); + const chainMmrPeaks = reconstructFlattenedVec(chainMmrPeaksAsFlattenedVec); + return db.transaction( "rw", stateSync, @@ -100,13 +106,15 @@ export async function applyStateSync( tags, async (tx) => { await updateSyncHeight(tx, blockNum); - await updateBlockHeader( - tx, - blockNum, - blockHeader, - chainMmrPeaks, - hasClientNotes - ); + for (let i = 0; i < newBlockHeaders.length; i++) { + await updateBlockHeader( + tx, + newBlockNums[i], + newBlockHeaders[i], + chainMmrPeaks[i], + hasClientNotes[i] + ); + } await updateChainMmrNodes(tx, nodeIndexes, nodes); await updateCommittedNoteTags(tx, inputNoteIds); await updateCommittedTransactions( @@ -232,3 +240,17 @@ function uint8ArrayToBase64(bytes) { ); return btoa(binary); } + +// Helper function to reconstruct arrays from flattened data +function reconstructFlattenedVec(flattenedVec) { + const data = flattenedVec.data(); + const lengths = flattenedVec.lengths(); + + let index = 0; + const result = []; + lengths.forEach((length) => { + result.push(data.slice(index, index + length)); + index += length; + }); + return result; +} diff --git a/crates/rust-client/src/store/web_store/mod.rs b/crates/rust-client/src/store/web_store/mod.rs index 451c19cfb..0fb9443db 100644 --- a/crates/rust-client/src/store/web_store/mod.rs +++ b/crates/rust-client/src/store/web_store/mod.rs @@ -14,7 +14,6 @@ use miden_objects::{ block::{BlockHeader, BlockNumber}, crypto::merkle::{InOrderIndex, MmrPeaks}, note::Nullifier, - transaction::TransactionId, Digest, Word, }; use tonic::async_trait; @@ -26,7 +25,6 @@ use super::{ OutputNoteRecord, Store, StoreError, TransactionFilter, }; use crate::{ - note::NoteUpdates, sync::{NoteTagRecord, StateSyncUpdate}, transaction::{TransactionRecord, TransactionStoreUpdate}, }; @@ -87,14 +85,6 @@ impl Store for WebStore { self.apply_state_sync(state_sync_update).await } - async fn apply_nullifiers( - &self, - note_updates: NoteUpdates, - transactions_to_discard: Vec, - ) -> Result<(), StoreError> { - self.apply_nullifiers(note_updates, transactions_to_discard).await - } - // TRANSACTIONS // -------------------------------------------------------------------------------------------- diff --git a/crates/rust-client/src/store/web_store/sync/flattened_vec.rs b/crates/rust-client/src/store/web_store/sync/flattened_vec.rs index d56bdf918..d39f79d20 100644 --- a/crates/rust-client/src/store/web_store/sync/flattened_vec.rs +++ b/crates/rust-client/src/store/web_store/sync/flattened_vec.rs @@ -5,7 +5,7 @@ use wasm_bindgen::prelude::*; #[wasm_bindgen] pub struct FlattenedU8Vec { data: Vec, - lengths: Vec, + lengths: Vec, } #[wasm_bindgen] @@ -14,18 +14,18 @@ impl FlattenedU8Vec { self.data.clone() } - pub fn lengths(&self) -> Vec { + pub fn lengths(&self) -> Vec { self.lengths.clone() } - pub fn num_inner_vecs(&self) -> u32 { - self.lengths.len() as u32 // The number of inner Vec is the number of lengths + pub fn num_inner_vecs(&self) -> usize { + self.lengths.len() // The number of inner Vec is the number of lengths } } pub fn flatten_nested_u8_vec(nested_vec: Vec>) -> FlattenedU8Vec { // Calculate the lengths of each inner Vec before flattening - let lengths: Vec = nested_vec.iter().map(|v| v.len() as u32).collect(); + let lengths: Vec = nested_vec.iter().map(Vec::len).collect(); // Now you can flatten the Vec> into a single Vec let data: Vec = nested_vec.into_iter().flatten().collect(); diff --git a/crates/rust-client/src/store/web_store/sync/js_bindings.rs b/crates/rust-client/src/store/web_store/sync/js_bindings.rs index 5c70ef8ed..05c630bb5 100644 --- a/crates/rust-client/src/store/web_store/sync/js_bindings.rs +++ b/crates/rust-client/src/store/web_store/sync/js_bindings.rs @@ -3,6 +3,8 @@ use alloc::{string::String, vec::Vec}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::{js_sys, wasm_bindgen}; +use super::flattened_vec::FlattenedU8Vec; + // Sync IndexedDB Operations #[wasm_bindgen(module = "/src/store/web_store/js/sync.js")] @@ -29,9 +31,10 @@ extern "C" { #[wasm_bindgen(js_name = applyStateSync)] pub fn idxdb_apply_state_sync( block_num: String, - block_header: Vec, - chain_mmr_peaks: Vec, - has_client_notes: bool, + flattened_new_block_headers: FlattenedU8Vec, + new_block_nums: Vec, + flattened_chain_mmr_peaks: FlattenedU8Vec, + has_client_notes: Vec, serialized_node_ids: Vec, serialized_nodes: Vec, note_tags_to_remove_as_str: Vec, diff --git a/crates/rust-client/src/store/web_store/sync/mod.rs b/crates/rust-client/src/store/web_store/sync/mod.rs index a039a9b5e..12c7764c5 100644 --- a/crates/rust-client/src/store/web_store/sync/mod.rs +++ b/crates/rust-client/src/store/web_store/sync/mod.rs @@ -7,7 +7,6 @@ use miden_objects::{ account::AccountId, block::BlockNumber, note::{NoteId, NoteTag}, - transaction::TransactionId, }; use miden_tx::utils::{Deserializable, Serializable}; use serde_wasm_bindgen::from_value; @@ -20,7 +19,6 @@ use super::{ WebStore, }; use crate::{ - note::NoteUpdates, store::StoreError, sync::{NoteTagRecord, NoteTagSource, StateSyncUpdate}, }; @@ -34,6 +32,9 @@ use js_bindings::{ mod models; use models::{NoteTagIdxdbObject, SyncHeightIdxdbObject}; +mod flattened_vec; +use flattened_vec::flatten_nested_u8_vec; + impl WebStore { pub(crate) async fn get_note_tags(&self) -> Result, StoreError> { let promise = idxdb_get_note_tags(); @@ -108,27 +109,30 @@ impl WebStore { state_sync_update: StateSyncUpdate, ) -> Result<(), StoreError> { let StateSyncUpdate { - block_header, - block_has_relevant_notes, - new_mmr_peaks, - new_authentication_nodes, + block_num, + block_updates, note_updates, - transaction_updates, + transaction_updates, //TODO: Add support for discarded transactions in web store account_updates, - tags_to_remove, } = state_sync_update; - // Serialize data for updating state sync and block header - let block_num_as_str = block_header.block_num().to_string(); - // Serialize data for updating block header - let block_header_as_bytes = block_header.to_bytes(); - let new_mmr_peaks_as_bytes = new_mmr_peaks.peaks().to_vec().to_bytes(); + let mut block_headers_as_bytes = vec![]; + let mut new_mmr_peaks_as_bytes = vec![]; + let mut block_nums_as_str = vec![]; + let mut block_has_relevant_notes = vec![]; + + for (block_header, has_client_notes, mmr_peaks) in block_updates.block_headers() { + block_headers_as_bytes.push(block_header.to_bytes()); + new_mmr_peaks_as_bytes.push(mmr_peaks.peaks().to_vec().to_bytes()); + block_nums_as_str.push(block_header.block_num().to_string()); + block_has_relevant_notes.push(u8::from(*has_client_notes)); + } // Serialize data for updating chain MMR nodes let mut serialized_node_ids = Vec::new(); let mut serialized_nodes = Vec::new(); - for (id, node) in &new_authentication_nodes { + for (id, node) in block_updates.new_authentication_nodes() { let serialized_data = serialize_chain_mmr_node(*id, *node)?; serialized_node_ids.push(serialized_data.id); serialized_nodes.push(serialized_data.node); @@ -139,16 +143,8 @@ impl WebStore { apply_note_updates_tx(¬e_updates).await?; // Tags to remove - let note_tags_to_remove_as_str: Vec = tags_to_remove - .iter() - .filter_map(|tag_record| { - if let NoteTagSource::Note(note_id) = tag_record.source { - Some(note_id.to_hex()) - } else { - None - } - }) - .collect(); + let note_tags_to_remove_as_str: Vec = + note_updates.committed_input_notes().map(|note| note.id().to_hex()).collect(); // Serialize data for updating committed transactions let transactions_to_commit_block_nums_as_str = transaction_updates @@ -168,14 +164,24 @@ impl WebStore { update_account(&account.clone()).await.unwrap(); } - for (account_id, _) in account_updates.mismatched_private_accounts() { + for (account_id, digest) in account_updates.mismatched_private_accounts() { + // Mismatched digests may be due to stale network data. If the mismatched digest is + // tracked in the db and corresponds to the mismatched account, it means we + // got a past update and shouldn't lock the account. + if let Some(account) = self.get_account_header_by_hash(*digest).await? { + if account.id() == *account_id { + continue; + } + } + lock_account(account_id).await.unwrap(); } let promise = idxdb_apply_state_sync( - block_num_as_str, - block_header_as_bytes, - new_mmr_peaks_as_bytes, + block_num.to_string(), + flatten_nested_u8_vec(block_headers_as_bytes), + block_nums_as_str, + flatten_nested_u8_vec(new_mmr_peaks_as_bytes), block_has_relevant_notes, serialized_node_ids, serialized_nodes, @@ -187,12 +193,4 @@ impl WebStore { Ok(()) } - - pub(super) async fn apply_nullifiers( - &self, - note_updates: NoteUpdates, - _transactions_to_discard: Vec, - ) -> Result<(), StoreError> { - apply_note_updates_tx(¬e_updates).await - } } diff --git a/crates/rust-client/src/sync/block_header.rs b/crates/rust-client/src/sync/block_header.rs index 88bae531b..83e70d546 100644 --- a/crates/rust-client/src/sync/block_header.rs +++ b/crates/rust-client/src/sync/block_header.rs @@ -1,6 +1,6 @@ use alloc::vec::Vec; -use crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr}; +use crypto::merkle::{InOrderIndex, MmrPeaks, PartialMmr}; use miden_objects::{ block::{BlockHeader, BlockNumber}, crypto::{self, merkle::MerklePath, rand::FeltRng}, @@ -8,9 +8,7 @@ use miden_objects::{ }; use tracing::warn; -use super::NoteUpdates; use crate::{ - note::NoteScreener, store::{ChainMmrNodeFilter, NoteFilter, StoreError}, Client, ClientError, }; @@ -19,8 +17,8 @@ use crate::{ impl Client { /// Updates committed notes with no MMR data. These could be notes that were /// imported with an inclusion proof, but its block header isn't tracked. - pub(crate) async fn update_mmr_data(&mut self) -> Result<(), ClientError> { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + pub(crate) async fn update_mmr_data(&self) -> Result<(), ClientError> { + let mut current_partial_mmr = self.build_current_partial_mmr().await?; let mut changed_notes = vec![]; for mut note in self.store.get_input_notes(NoteFilter::Unverified).await? { @@ -73,41 +71,13 @@ impl Client { // HELPERS // -------------------------------------------------------------------------------------------- - /// Checks the relevance of the block by verifying if any of the input notes in the block are - /// relevant to the client. If any of the notes are relevant, the function returns `true`. - pub(crate) async fn check_block_relevance( - &mut self, - committed_notes: &NoteUpdates, - ) -> Result { - // We'll only do the check for either incoming public notes or expected input notes as - // output notes are not really candidates to be consumed here. - - let note_screener = NoteScreener::new(self.store.clone()); - - // Find all relevant Input Notes using the note checker - for input_note in committed_notes.updated_input_notes() { - if !note_screener - .check_relevance(&input_note.try_into().map_err(ClientError::NoteRecordError)?) - .await? - .is_empty() - { - return Ok(true); - } - } - - Ok(false) - } - /// Builds the current view of the chain's [`PartialMmr`]. Because we want to add all new /// authentication nodes that could come from applying the MMR updates, we need to track all /// known leaves thus far. /// /// As part of the syncing process, we add the current block number so we don't need to /// track it here. - pub(crate) async fn build_current_partial_mmr( - &self, - include_current_block: bool, - ) -> Result { + pub(crate) async fn build_current_partial_mmr(&self) -> Result { let current_block_num = self.store.get_sync_height().await?; let tracked_nodes = self.store.get_chain_mmr_nodes(ChainMmrNodeFilter::All).await?; @@ -129,15 +99,13 @@ impl Client { let mut current_partial_mmr = PartialMmr::from_parts(current_peaks, tracked_nodes, track_latest); - if include_current_block { - let (current_block, has_client_notes) = self - .store - .get_block_header_by_num(current_block_num) - .await? - .expect("Current block should be in the store"); + let (current_block, has_client_notes) = self + .store + .get_block_header_by_num(current_block_num) + .await? + .expect("Current block should be in the store"); - current_partial_mmr.add(current_block.hash(), has_client_notes); - } + current_partial_mmr.add(current_block.hash(), has_client_notes); Ok(current_partial_mmr) } @@ -206,7 +174,7 @@ impl Client { return self.ensure_genesis_in_place().await; } - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.build_current_partial_mmr().await?; let anchor_block = self .get_and_store_authenticated_block(epoch_block_number, &mut current_partial_mmr) .await?; @@ -260,29 +228,3 @@ fn adjust_merkle_path_for_forest( path_nodes } - -/// Applies changes to the Mmr structure, storing authentication nodes for leaves we track -/// and returns the updated [`PartialMmr`]. -pub(crate) fn apply_mmr_changes( - current_partial_mmr: PartialMmr, - mmr_delta: MmrDelta, - current_block_header: &BlockHeader, - current_block_has_relevant_notes: bool, -) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), StoreError> { - let mut partial_mmr: PartialMmr = current_partial_mmr; - - // First, apply curent_block to the Mmr - let new_authentication_nodes = partial_mmr - .add(current_block_header.hash(), current_block_has_relevant_notes) - .into_iter(); - - // Apply the Mmr delta to bring Mmr to forest equal to chain tip - let new_authentication_nodes: Vec<(InOrderIndex, Digest)> = partial_mmr - .apply(mmr_delta) - .map_err(StoreError::MmrError)? - .into_iter() - .chain(new_authentication_nodes) - .collect(); - - Ok((partial_mmr.peaks(), new_authentication_nodes)) -} diff --git a/crates/rust-client/src/sync/mod.rs b/crates/rust-client/src/sync/mod.rs index 8d7646339..96f7b4118 100644 --- a/crates/rust-client/src/sync/mod.rs +++ b/crates/rust-client/src/sync/mod.rs @@ -54,119 +54,34 @@ //! `committed_note_updates` and `consumed_note_updates`) to understand how the sync data is //! processed and applied to the local store. -use alloc::{collections::BTreeMap, vec::Vec}; +use alloc::{boxed::Box, vec::Vec}; use core::cmp::max; use miden_objects::{ - account::{Account, AccountHeader, AccountId}, - block::{BlockHeader, BlockNumber}, + account::AccountId, + block::BlockNumber, crypto::rand::FeltRng, - note::{NoteId, NoteInclusionProof, NoteTag, Nullifier}, + note::{NoteId, NoteTag}, transaction::TransactionId, - Digest, }; -use tracing::info; -use crate::{ - note::NoteUpdates, - rpc::domain::{ - note::CommittedNote, nullifier::NullifierUpdate, transaction::TransactionUpdate, - }, - store::{AccountUpdates, InputNoteRecord, NoteFilter, OutputNoteRecord, TransactionFilter}, - transaction::{TransactionStatus, TransactionUpdates}, - Client, ClientError, -}; +use crate::{store::NoteFilter, Client, ClientError}; mod block_header; -use block_header::apply_mmr_changes; mod tag; pub use tag::{NoteTagRecord, NoteTagSource}; -mod state_sync_update; -pub use state_sync_update::StateSyncUpdate; +mod state_sync; +pub use state_sync::{on_note_received, OnNoteReceived, StateSync}; -/// Contains stats about the sync operation. -pub struct SyncSummary { - /// Block number up to which the client has been synced. - pub block_num: BlockNumber, - /// IDs of tracked notes that received inclusion proofs. - pub committed_notes: Vec, - /// IDs of notes that have been consumed. - pub consumed_notes: Vec, - /// IDs of on-chain accounts that have been updated. - pub updated_accounts: Vec, - /// IDs of private accounts that have been locked. - pub locked_accounts: Vec, - /// IDs of committed transactions. - pub committed_transactions: Vec, -} - -impl SyncSummary { - pub fn new( - block_num: BlockNumber, - committed_notes: Vec, - consumed_notes: Vec, - updated_accounts: Vec, - locked_accounts: Vec, - committed_transactions: Vec, - ) -> Self { - Self { - block_num, - committed_notes, - consumed_notes, - updated_accounts, - locked_accounts, - committed_transactions, - } - } - - pub fn new_empty(block_num: BlockNumber) -> Self { - Self { - block_num, - committed_notes: vec![], - consumed_notes: vec![], - updated_accounts: vec![], - locked_accounts: vec![], - committed_transactions: vec![], - } - } - - pub fn is_empty(&self) -> bool { - self.committed_notes.is_empty() - && self.consumed_notes.is_empty() - && self.updated_accounts.is_empty() - && self.locked_accounts.is_empty() - && self.committed_transactions.is_empty() - } - - pub fn combine_with(&mut self, mut other: Self) { - self.block_num = max(self.block_num, other.block_num); - self.committed_notes.append(&mut other.committed_notes); - self.consumed_notes.append(&mut other.consumed_notes); - self.updated_accounts.append(&mut other.updated_accounts); - self.locked_accounts.append(&mut other.locked_accounts); - self.committed_transactions.append(&mut other.committed_transactions); - } -} - -enum SyncStatus { - SyncedToLastBlock(SyncSummary), - SyncedToBlock(SyncSummary), -} - -impl SyncStatus { - pub fn into_sync_summary(self) -> SyncSummary { - match self { - SyncStatus::SyncedToBlock(summary) | SyncStatus::SyncedToLastBlock(summary) => summary, - } - } -} +mod state_sync_update; +pub use state_sync_update::{AccountUpdates, BlockUpdates, StateSyncUpdate, TransactionUpdates}; // CONSTANTS // ================================================================================================ -/// Client syncronization methods. +// Client syncronization methods. impl Client { // SYNC STATE // -------------------------------------------------------------------------------------------- @@ -176,8 +91,8 @@ impl Client { self.store.get_sync_height().await.map_err(Into::into) } - /// Syncs the client's state with the current state of the Miden network. Returns the block - /// number the client has been synced to. + /// Syncs the client's state with the current state of the Miden network and returns a + /// [`SyncSummary`] corresponding to the local state update. /// /// The sync process is done in multiple steps: /// 1. A request is sent to the node to get the state updates. This request includes tracked @@ -191,35 +106,25 @@ impl Client { /// can be consumed by accounts the client is tracking (this is checked by the /// [`crate::note::NoteScreener`]) /// 5. Transactions are updated with their new states. - /// 6. Tracked public accounts are updated and off-chain accounts are validated against the node + /// 6. Tracked public accounts are updated and private accounts are validated against the node /// state. /// 7. The MMR is updated with the new peaks and authentication nodes. /// 8. All updates are applied to the store to be persisted. pub async fn sync_state(&mut self) -> Result { - let starting_block_num = self.get_sync_height().await?; - _ = self.ensure_genesis_in_place().await?; - let mut total_sync_summary = SyncSummary::new_empty(0.into()); - loop { - let response = self.sync_state_once().await?; - let is_last_block = matches!(response, SyncStatus::SyncedToLastBlock(_)); - total_sync_summary.combine_with(response.into_sync_summary()); - - if is_last_block { - break; - } - } - self.update_mmr_data().await?; - // Sync and apply nullifiers - total_sync_summary.combine_with(self.sync_nullifiers(starting_block_num).await?); - - Ok(total_sync_summary) - } - async fn sync_state_once(&mut self) -> Result { - let current_block_num = self.store.get_sync_height().await?; + let state_sync = StateSync::new( + self.rpc_api.clone(), + Box::new({ + let store_clone = self.store.clone(); + move |committed_note, public_note| { + Box::pin(on_note_received(store_clone.clone(), committed_note, public_note)) + } + }), + ); - let accounts: Vec = self + // Get current state of the client + let accounts = self .store .get_account_headers() .await? @@ -230,75 +135,21 @@ impl Client { let note_tags: Vec = self.store.get_unique_note_tags().await?.into_iter().collect(); - // Send request - let account_ids: Vec = accounts.iter().map(AccountHeader::id).collect(); - let response = self.rpc_api.sync_state(current_block_num, &account_ids, ¬e_tags).await?; - - // We don't need to continue if the chain has not advanced, there are no new changes - if response.block_header.block_num() == current_block_num { - return Ok(SyncStatus::SyncedToLastBlock(SyncSummary::new_empty(current_block_num))); - } - - let (note_updates, tags_to_remove) = self - .committed_note_updates(response.note_inclusions, &response.block_header) + let unspent_input_notes = self.store.get_input_notes(NoteFilter::Unspent).await?; + let unspent_output_notes = self.store.get_output_notes(NoteFilter::Unspent).await?; + + // Get the sync update from the network + let state_sync_update = state_sync + .sync_state( + self.build_current_partial_mmr().await?, + accounts, + note_tags, + unspent_input_notes, + unspent_output_notes, + ) .await?; - let incoming_block_has_relevant_notes = self.check_block_relevance(¬e_updates).await?; - - let transactions_to_commit = self.get_transactions_to_commit(response.transactions).await?; - - let (public_accounts, private_accounts): (Vec<_>, Vec<_>) = - accounts.into_iter().partition(|account_header| account_header.id().is_public()); - - let updated_public_accounts = self - .get_updated_public_accounts(&response.account_hash_updates, &public_accounts) - .await?; - - let mismatched_private_accounts = self - .validate_local_account_hashes(&response.account_hash_updates, &private_accounts) - .await?; - - // Build PartialMmr with current data and apply updates - let (new_peaks, new_authentication_nodes) = { - let current_partial_mmr = self.build_current_partial_mmr(false).await?; - - let (current_block, has_relevant_notes) = self - .store - .get_block_header_by_num(current_block_num) - .await? - .expect("Current block should be in the store"); - - apply_mmr_changes( - current_partial_mmr, - response.mmr_delta, - ¤t_block, - has_relevant_notes, - )? - }; - - // Store summary to return later - let sync_summary = SyncSummary::new( - response.block_header.block_num(), - note_updates.committed_note_ids().into_iter().collect(), - note_updates.consumed_note_ids().into_iter().collect(), - updated_public_accounts.iter().map(Account::id).collect(), - mismatched_private_accounts.iter().map(|(acc_id, _)| *acc_id).collect(), - transactions_to_commit.iter().map(|tx| tx.transaction_id).collect(), - ); - let response_block_num = response.block_header.block_num(); - let state_sync_update = StateSyncUpdate { - block_header: response.block_header, - block_has_relevant_notes: incoming_block_has_relevant_notes, - new_mmr_peaks: new_peaks, - new_authentication_nodes, - note_updates, - transaction_updates: TransactionUpdates::new(transactions_to_commit, vec![]), - account_updates: AccountUpdates::new( - updated_public_accounts, - mismatched_private_accounts, - ), - tags_to_remove, - }; + let sync_summary: SyncSummary = (&state_sync_update).into(); // Apply received and computed updates to the store self.store @@ -306,363 +157,75 @@ impl Client { .await .map_err(ClientError::StoreError)?; - if response.chain_tip == response_block_num { - Ok(SyncStatus::SyncedToLastBlock(sync_summary)) - } else { - Ok(SyncStatus::SyncedToBlock(sync_summary)) - } - } - - // HELPERS - // -------------------------------------------------------------------------------------------- - - async fn sync_nullifiers( - &mut self, - starting_block_num: BlockNumber, - ) -> Result { - // To receive information about added nullifiers, we reduce them to the higher 16 bits - // Note that besides filtering by nullifier prefixes, the node also filters by block number - // (it only returns nullifiers from current_block_num until - // response.block_header.block_num()) - let nullifiers_tags: Vec = self - .store - .get_unspent_input_note_nullifiers() - .await? - .iter() - .map(Nullifier::prefix) - .collect(); - - let nullifiers = self - .rpc_api - .check_nullifiers_by_prefix(&nullifiers_tags, starting_block_num) - .await?; - - // Committed transactions - let committed_transactions = self - .store - .get_transactions(TransactionFilter::All) - .await? - .into_iter() - .filter_map(|tx| { - if let TransactionStatus::Committed(block_num) = tx.transaction_status { - Some(TransactionUpdate { - transaction_id: tx.id, - account_id: tx.account_id, - block_num: block_num.as_u32(), - }) - } else { - None - } - }) - .collect::>(); - - let (consumed_note_updates, transactions_to_discard) = - self.consumed_note_updates(&nullifiers, &committed_transactions).await?; - - // Store summary to return later - let sync_summary = SyncSummary::new( - 0.into(), - consumed_note_updates.committed_note_ids().into_iter().collect(), - consumed_note_updates.consumed_note_ids().into_iter().collect(), - vec![], - vec![], - committed_transactions.iter().map(|tx| tx.transaction_id).collect(), - ); - - // Apply received and computed updates to the store - self.store - .apply_nullifiers(consumed_note_updates, transactions_to_discard) - .await - .map_err(ClientError::StoreError)?; + self.update_mmr_data().await?; Ok(sync_summary) } +} - /// Returns the [`NoteUpdates`] containing new public note and committed input/output notes and - /// a list or note tag records to be removed from the store. - async fn committed_note_updates( - &mut self, - committed_notes: Vec, - block_header: &BlockHeader, - ) -> Result<(NoteUpdates, Vec), ClientError> { - // We'll only pick committed notes that we are tracking as input/output notes. Since the - // sync response contains notes matching either the provided accounts or the provided tag - // we might get many notes when we only care about a few of those. - let relevant_note_filter = - NoteFilter::List(committed_notes.iter().map(CommittedNote::note_id).copied().collect()); - - let mut committed_input_notes: BTreeMap = self - .store - .get_input_notes(relevant_note_filter.clone()) - .await? - .into_iter() - .map(|n| (n.id(), n)) - .collect(); - - let mut committed_output_notes: BTreeMap = self - .store - .get_output_notes(relevant_note_filter) - .await? - .into_iter() - .map(|n| (n.id(), n)) - .collect(); - - let mut new_public_notes = vec![]; - let mut committed_tracked_input_notes = vec![]; - let mut committed_tracked_output_notes = vec![]; - let mut removed_tags = vec![]; - - for committed_note in committed_notes { - let inclusion_proof = NoteInclusionProof::new( - block_header.block_num(), - committed_note.note_index(), - committed_note.merkle_path().clone(), - )?; - - if let Some(mut note_record) = committed_input_notes.remove(committed_note.note_id()) { - // The note belongs to our locally tracked set of input notes - - let inclusion_proof_received = note_record - .inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?; - let block_header_received = note_record.block_header_received(block_header)?; - - removed_tags.push((¬e_record).try_into()?); - - if inclusion_proof_received || block_header_received { - committed_tracked_input_notes.push(note_record); - } - } - - if let Some(mut note_record) = committed_output_notes.remove(committed_note.note_id()) { - // The note belongs to our locally tracked set of output notes - - if note_record.inclusion_proof_received(inclusion_proof.clone())? { - committed_tracked_output_notes.push(note_record); - } - } - - if !committed_input_notes.contains_key(committed_note.note_id()) - && !committed_output_notes.contains_key(committed_note.note_id()) - { - // The note is public and we are not tracking it, push to the list of IDs to query - new_public_notes.push(*committed_note.note_id()); - } - } - - // Query the node for input note data and build the entities - let new_public_notes = - self.fetch_public_note_details(&new_public_notes, block_header).await?; - - Ok(( - NoteUpdates::new( - [new_public_notes, committed_tracked_input_notes].concat(), - committed_tracked_output_notes, - ), - removed_tags, - )) - } - - /// Returns the [`NoteUpdates`] containing consumed input/output notes and a list of IDs of the - /// transactions that were discarded. - async fn consumed_note_updates( - &mut self, - nullifiers: &[NullifierUpdate], - committed_transactions: &[TransactionUpdate], - ) -> Result<(NoteUpdates, Vec), ClientError> { - let nullifier_filter = NoteFilter::Nullifiers( - nullifiers.iter().map(|nullifier_update| nullifier_update.nullifier).collect(), - ); - - let mut consumed_input_notes: BTreeMap = self - .store - .get_input_notes(nullifier_filter.clone()) - .await? - .into_iter() - .map(|n| (n.nullifier(), n)) - .collect(); - - let mut consumed_output_notes: BTreeMap = self - .store - .get_output_notes(nullifier_filter) - .await? - .into_iter() - .map(|n| { - ( - n.nullifier() - .expect("Output notes returned by this query should have nullifiers"), - n, - ) - }) - .collect(); - - let mut consumed_tracked_input_notes = vec![]; - let mut consumed_tracked_output_notes = vec![]; - - for transaction_update in committed_transactions { - let transaction_nullifiers: Vec = consumed_input_notes - .iter() - .filter_map(|(nullifier, note_record)| { - if note_record.is_processing() - && note_record.consumer_transaction_id() - == Some(&transaction_update.transaction_id) - { - Some(nullifier) - } else { - None - } - }) - .copied() - .collect(); - - for nullifier in transaction_nullifiers { - if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { - if input_note_record.transaction_committed( - transaction_update.transaction_id, - transaction_update.block_num, - )? { - consumed_tracked_input_notes.push(input_note_record); - } - } - } - } - - // Nullified notes - let mut discarded_transactions = vec![]; - for nullifier_update in nullifiers { - let nullifier = nullifier_update.nullifier; - let block_num = nullifier_update.block_num; - - if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { - if input_note_record.is_processing() { - discarded_transactions.push( - *input_note_record - .consumer_transaction_id() - .expect("Processing note should have consumer transaction id"), - ); - } +// SYNC SUMMARY +// ================================================================================================ - if input_note_record.consumed_externally(nullifier, block_num)? { - consumed_tracked_input_notes.push(input_note_record); - } - } +/// Contains stats about the sync operation. +pub struct SyncSummary { + /// Block number up to which the client has been synced. + pub block_num: BlockNumber, + /// IDs of notes that have been committed. + pub committed_notes: Vec, + /// IDs of notes that have been consumed. + pub consumed_notes: Vec, + /// IDs of on-chain accounts that have been updated. + pub updated_accounts: Vec, + /// IDs of private accounts that have been locked. + pub locked_accounts: Vec, + /// IDs of committed transactions. + pub committed_transactions: Vec, +} - if let Some(mut output_note_record) = consumed_output_notes.remove(&nullifier) { - if output_note_record.nullifier_received(nullifier, block_num)? { - consumed_tracked_output_notes.push(output_note_record); - } - } +impl SyncSummary { + pub fn new( + block_num: BlockNumber, + committed_notes: Vec, + consumed_notes: Vec, + updated_accounts: Vec, + locked_accounts: Vec, + committed_transactions: Vec, + ) -> Self { + Self { + block_num, + committed_notes, + consumed_notes, + updated_accounts, + locked_accounts, + committed_transactions, } - - Ok(( - NoteUpdates::new(consumed_tracked_input_notes, consumed_tracked_output_notes), - discarded_transactions, - )) } - /// Queries the node for all received notes that aren't being locally tracked in the client. - /// - /// The client can receive metadata for private notes that it's not tracking. In this case, - /// notes are ignored for now as they become useless until details are imported. - async fn fetch_public_note_details( - &mut self, - query_notes: &[NoteId], - block_header: &BlockHeader, - ) -> Result, ClientError> { - if query_notes.is_empty() { - return Ok(vec![]); - } - info!("Getting note details for notes that are not being tracked."); - - let mut return_notes = self - .rpc_api - .get_public_note_records(query_notes, self.store.get_current_timestamp()) - .await?; - - for note in &mut return_notes { - note.block_header_received(block_header)?; + pub fn new_empty(block_num: BlockNumber) -> Self { + Self { + block_num, + committed_notes: vec![], + consumed_notes: vec![], + updated_accounts: vec![], + locked_accounts: vec![], + committed_transactions: vec![], } - - Ok(return_notes) } - /// Extracts information about transactions for uncommitted transactions that the client is - /// tracking from the received [`SyncStateResponse`]. - async fn get_transactions_to_commit( - &self, - mut transactions: Vec, - ) -> Result, ClientError> { - // Get current uncommitted transactions - let uncommitted_transaction_ids = self - .store - .get_transactions(TransactionFilter::Uncomitted) - .await? - .into_iter() - .map(|tx| tx.id) - .collect::>(); - - transactions.retain(|transaction_update| { - uncommitted_transaction_ids.contains(&transaction_update.transaction_id) - }); - - Ok(transactions) - } - - async fn get_updated_public_accounts( - &mut self, - account_updates: &[(AccountId, Digest)], - current_public_accounts: &[AccountHeader], - ) -> Result, ClientError> { - let mut mismatched_public_accounts = vec![]; - - for (id, hash) in account_updates { - // check if this updated account is tracked by the client - if let Some(account) = current_public_accounts - .iter() - .find(|acc| *id == acc.id() && *hash != acc.hash()) - { - mismatched_public_accounts.push(account); - } - } - - self.rpc_api - .get_updated_public_accounts(&mismatched_public_accounts) - .await - .map_err(ClientError::RpcError) + pub fn is_empty(&self) -> bool { + self.committed_notes.is_empty() + && self.consumed_notes.is_empty() + && self.updated_accounts.is_empty() + && self.locked_accounts.is_empty() + && self.committed_transactions.is_empty() } - /// Validates account hash updates and returns a vector with all the private account - /// mismatches. - /// - /// Private account mismatches happen when the hash account of the local tracked account - /// doesn't match the hash account of the account in the node. This would be an anomaly and may - /// happen for two main reasons: - /// - A different client made a transaction with the account, changing its state. - /// - The local transaction that modified the local state didn't go through, rendering the local - /// account state outdated. - async fn validate_local_account_hashes( - &mut self, - account_updates: &[(AccountId, Digest)], - current_private_accounts: &[AccountHeader], - ) -> Result, ClientError> { - let mut mismatched_accounts = vec![]; - - for (remote_account_id, remote_account_hash) in account_updates { - // ensure that if we track that account, it has the same hash - let mismatched_account = current_private_accounts - .iter() - .find(|acc| *remote_account_id == acc.id() && *remote_account_hash != acc.hash()); - - // Private accounts should always have the latest known state. If we receive a stale - // update we ignore it. - if mismatched_account.is_some() { - let account_by_hash = - self.store.get_account_header_by_hash(*remote_account_hash).await?; - - if account_by_hash.is_none() { - mismatched_accounts.push((*remote_account_id, *remote_account_hash)); - } - } - } - Ok(mismatched_accounts) + pub fn combine_with(&mut self, mut other: Self) { + self.block_num = max(self.block_num, other.block_num); + self.committed_notes.append(&mut other.committed_notes); + self.consumed_notes.append(&mut other.consumed_notes); + self.updated_accounts.append(&mut other.updated_accounts); + self.locked_accounts.append(&mut other.locked_accounts); + self.committed_transactions.append(&mut other.committed_transactions); } } diff --git a/crates/rust-client/src/sync/state_sync.rs b/crates/rust-client/src/sync/state_sync.rs new file mode 100644 index 000000000..d21d19c95 --- /dev/null +++ b/crates/rust-client/src/sync/state_sync.rs @@ -0,0 +1,439 @@ +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use core::{future::Future, pin::Pin}; + +use miden_objects::{ + account::{Account, AccountHeader, AccountId}, + block::{BlockHeader, BlockNumber}, + crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr}, + note::{NoteId, NoteTag}, + Digest, +}; +use tracing::info; + +use super::{AccountUpdates, BlockUpdates, StateSyncUpdate, TransactionUpdates}; +use crate::{ + note::{NoteScreener, NoteUpdates}, + rpc::{domain::note::CommittedNote, NodeRpcClient}, + store::{InputNoteRecord, NoteFilter, OutputNoteRecord, Store, StoreError}, + ClientError, +}; + +// SYNC CALLBACKS +// ================================================================================================ + +/// Callback that gets executed when a new note inclusion is received as part of the sync response. +/// It receives the committed note received from the network and the input note state and an +/// optional note record that corresponds to the state of the note in the network (only if the note +/// is public). +/// +/// It returns a boolean indicating if the received note update is relevant. +/// If the return value is `false`, it gets discarded. If it is `true`, the update gets committed to +/// the client's store. +pub type OnNoteReceived = Box< + dyn Fn( + CommittedNote, + Option, + ) -> Pin>>>, +>; + +// STATE SYNC +// ================================================================================================ + +/// The state sync components encompasses the client's sync logic. +/// +/// When created it receives callbacks that will be executed when a new note inclusion or a +/// nullifier is received in the sync response. +/// +/// +/// current state of the client's relevant elements (block, accounts, +/// notes, etc). It is then used to requset updates from the node and apply them to the relevant +/// elements. The updates are then returned and can be applied to the store to persist the changes. +pub struct StateSync { + /// The RPC client used to communicate with the node. + rpc_api: Arc, + /// Callback to be executed when a new note inclusion is received. + on_note_received: OnNoteReceived, +} + +impl StateSync { + /// Creates a new instance of the state sync component. + /// + /// # Arguments + /// + /// * `rpc_api` - The RPC client used to communicate with the node. + /// * `on_note_received` - A callback to be executed when a new note inclusion is received. + /// * `on_nullifier_received` - A callback to be executed when a nullifier is received. + pub fn new(rpc_api: Arc, on_note_received: OnNoteReceived) -> Self { + Self { rpc_api, on_note_received } + } + + /// Syncs the state of the client with the chain tip of the node, returning the updates that + /// should be applied to the store. + /// + /// During the sync process, the client will go through the following steps: + /// 1. A request is sent to the node to get the state updates. This request includes tracked + /// account IDs and the tags of notes that might have changed or that might be of interest to + /// the client. + /// 2. A response is received with the current state of the network. The response includes + /// information about new/committed/consumed notes, updated accounts, and committed + /// transactions. + /// 3. Tracked public accounts are updated and private accounts are validated against the node + /// state. + /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified + /// during the sync processing. + /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the + /// [`OnNoteReceived`] callback. + /// 6. Transactions are updated with their new states. Transactions might be committed or + /// discarded. + /// 7. The MMR is updated with the new peaks and authentication nodes. + /// + /// # Arguments + /// * `current_partial_mmr` - The current partial MMR. + /// * `accounts` - All the headers of tracked accounts. + /// * `note_tags` - The note tags to be used in the sync state request. + /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client. + /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client. + pub async fn sync_state( + mut self, + mut current_partial_mmr: PartialMmr, + accounts: Vec, + note_tags: Vec, + unspent_input_notes: Vec, + unspent_output_notes: Vec, + ) -> Result { + let block_num = (u32::try_from(current_partial_mmr.num_leaves() - 1) + .expect("The number of leaves in the MMR should be greater than 0 and less than 2^32")) + .into(); + + let mut state_sync_update = StateSyncUpdate { + block_num, + note_updates: NoteUpdates::new(unspent_input_notes, unspent_output_notes), + ..Default::default() + }; + + loop { + if !self + .sync_state_step( + &mut state_sync_update, + &mut current_partial_mmr, + &accounts, + ¬e_tags, + ) + .await? + { + break; + } + } + + self.sync_nullifiers(&mut state_sync_update, block_num).await?; + + Ok(state_sync_update) + } + + /// Executes a single step of the state sync process, returning `true` if the client should + /// continue syncing and `false` if the client has reached the chain tip. + /// + /// A step in this context means a single request to the node to get the next relevant block and + /// the changes that happened in it. This block may not be the last one in the chain and + /// the client may need to call this method multiple times until it reaches the chain tip. + /// + /// The `sync_state_update` field of the struct will be updated with the new changes from this + /// step. + async fn sync_state_step( + &mut self, + state_sync_update: &mut StateSyncUpdate, + current_partial_mmr: &mut PartialMmr, + accounts: &[AccountHeader], + note_tags: &[NoteTag], + ) -> Result { + let account_ids: Vec = accounts.iter().map(AccountHeader::id).collect(); + + let response = self + .rpc_api + .sync_state(state_sync_update.block_num, &account_ids, note_tags) + .await?; + + // We don't need to continue if the chain has not advanced, there are no new changes + if response.block_header.block_num() == state_sync_update.block_num { + return Ok(false); + } + + let new_block_num = response.block_header.block_num(); + state_sync_update.block_num = new_block_num; + + let account_updates = + self.account_state_sync(accounts, &response.account_hash_updates).await?; + + state_sync_update.account_updates.extend(account_updates); + + // Track the transaction updates for transactions that were committed. Some of these might + // be tracked by the client and need to be marked as committed. + state_sync_update + .transaction_updates + .extend(TransactionUpdates::new(response.transactions, vec![])); + + let found_relevant_note = self + .note_state_sync( + &mut state_sync_update.note_updates, + response.note_inclusions, + &response.block_header, + ) + .await?; + + let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes( + &response.block_header, + found_relevant_note, + current_partial_mmr, + response.mmr_delta, + )?; + + state_sync_update.block_updates.extend(BlockUpdates::new( + vec![(response.block_header, found_relevant_note, new_mmr_peaks)], + new_authentication_nodes, + )); + + if response.chain_tip == new_block_num { + Ok(false) + } else { + Ok(true) + } + } + + // HELPERS + // -------------------------------------------------------------------------------------------- + + /// Compares the state of tracked accounts with the updates received from the node. The method + /// updates the `state_sync_update` field with the details of the accounts that need to be + /// updated. + /// + /// The account updates might include: + /// * Public accounts that have been updated in the node. + /// * Private accounts that have been marked as mismatched because the current hash doesn't + /// match the one received from the node. The client will need to handle these cases as they + /// could be a stale account state or a reason to lock the account. + async fn account_state_sync( + &mut self, + accounts: &[AccountHeader], + account_hash_updates: &[(AccountId, Digest)], + ) -> Result { + let (public_accounts, private_accounts): (Vec<_>, Vec<_>) = + accounts.iter().partition(|account_header| account_header.id().is_public()); + + let updated_public_accounts = + self.get_updated_public_accounts(account_hash_updates, &public_accounts).await?; + + let mismatched_private_accounts = account_hash_updates + .iter() + .filter(|(account_id, digest)| { + private_accounts + .iter() + .any(|account| account.id() == *account_id && &account.hash() != digest) + }) + .copied() + .collect::>(); + + Ok(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts)) + } + + /// Queries the node for the latest state of the public accounts that don't match the current + /// state of the client. + async fn get_updated_public_accounts( + &self, + account_updates: &[(AccountId, Digest)], + current_public_accounts: &[&AccountHeader], + ) -> Result, ClientError> { + let mut mismatched_public_accounts = vec![]; + + for (id, hash) in account_updates { + // check if this updated account state is tracked by the client + if let Some(account) = current_public_accounts + .iter() + .find(|acc| *id == acc.id() && *hash != acc.hash()) + { + mismatched_public_accounts.push(*account); + } + } + + self.rpc_api + .get_updated_public_accounts(&mismatched_public_accounts) + .await + .map_err(ClientError::RpcError) + } + + /// Applies the changes received from the sync response to the notes and transactions tracked + /// by the client and updates the `note_updates` accordingly. + /// + /// This method uses the callbacks provided to the [`StateSync`] component to check if the + /// updates received are relevant to the client. + /// + /// The note updates might include: + /// * New notes that we received from the node and might be relevant to the client. + /// * Tracked expected notes that were committed in the block. + /// * Tracked notes that were being processed by a transaction that got committed. + /// * Tracked notes that were nullified by an external transaction. + async fn note_state_sync( + &mut self, + note_updates: &mut NoteUpdates, + note_inclusions: Vec, + block_header: &BlockHeader, + ) -> Result { + let public_note_ids: Vec = note_inclusions + .iter() + .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id())) + .collect(); + + let mut found_relevant_note = false; + + // Process note inclusions + let new_public_notes = + Arc::new(self.fetch_public_note_details(&public_note_ids, block_header).await?); + for committed_note in note_inclusions { + let public_note = new_public_notes + .iter() + .find(|note| ¬e.id() == committed_note.note_id()) + .cloned(); + if (self.on_note_received)(committed_note.clone(), public_note.clone()).await? { + found_relevant_note = true; + + if let Some(public_note) = public_note { + note_updates.insert_updates(Some(public_note), None); + } + + note_updates + .apply_committed_note_state_transitions(&committed_note, block_header)?; + } + } + + Ok(found_relevant_note) + } + + /// Queries the node for all received notes that aren't being locally tracked in the client. + /// + /// The client can receive metadata for private notes that it's not tracking. In this case, + /// notes are ignored for now as they become useless until details are imported. + async fn fetch_public_note_details( + &self, + query_notes: &[NoteId], + block_header: &BlockHeader, + ) -> Result, ClientError> { + if query_notes.is_empty() { + return Ok(vec![]); + } + info!("Getting note details for notes that are not being tracked."); + + let mut return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?; + + for note in &mut return_notes { + note.block_header_received(block_header)?; + } + + Ok(return_notes) + } + + /// Collects the nullifier tags for the notes that were updated in the sync response and uses + /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these + /// notes. It then processes the nullifiers to apply the state transitions on the note updates. + /// + /// The `state_sync_update` field will be updated to track the new discarded transactions. + async fn sync_nullifiers( + &self, + state_sync_update: &mut StateSyncUpdate, + current_block_num: BlockNumber, + ) -> Result<(), ClientError> { + // To receive information about added nullifiers, we reduce them to the higher 16 bits + // Note that besides filtering by nullifier prefixes, the node also filters by block number + // (it only returns nullifiers from current_block_num until + // response.block_header.block_num()) + + // Check for new nullifiers for input notes that were updated + let nullifiers_tags: Vec = state_sync_update + .note_updates + .updated_input_notes() + .map(|note| note.nullifier().prefix()) + .collect(); + + let new_nullifiers = self + .rpc_api + .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num) + .await?; + + // Process nullifiers and track the updates of local tracked transactions that were + // discarded because the notes that they were processing were nullified by an + // another transaction. + let mut discarded_transactions = vec![]; + + for nullifier_update in new_nullifiers { + let discarded_transaction = + state_sync_update.note_updates.apply_nullifiers_state_transitions( + &nullifier_update, + state_sync_update.transaction_updates.committed_transactions(), + )?; + + if let Some(transaction_id) = discarded_transaction { + discarded_transactions.push(transaction_id); + } + } + + let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions); + state_sync_update.transaction_updates.extend(transaction_updates); + + Ok(()) + } +} + +// HELPERS +// ================================================================================================ + +/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the +/// authentication nodes for leaves we track. +fn apply_mmr_changes( + new_block: &BlockHeader, + new_block_has_relevant_notes: bool, + current_partial_mmr: &mut PartialMmr, + mmr_delta: MmrDelta, +) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), ClientError> { + // Apply the MMR delta to bring MMR to forest equal to chain tip + let mut new_authentication_nodes: Vec<(InOrderIndex, Digest)> = + current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?; + + let new_peaks = current_partial_mmr.peaks(); + + new_authentication_nodes + .append(&mut current_partial_mmr.add(new_block.hash(), new_block_has_relevant_notes)); + + Ok((new_peaks, new_authentication_nodes)) +} + +// DEFAULT CALLBACK IMPLEMENTATIONS +// ================================================================================================ + +/// Default implementation of the [`OnNoteReceived`] callback. It queries the store for the +/// committed note to check if it's relevant. If the note wasn't being tracked but it came in the +/// sync response it may be a new public note, in that case we use the [`NoteScreener`] to check its +/// relevance. +pub async fn on_note_received( + store: Arc, + committed_note: CommittedNote, + public_note: Option, +) -> Result { + let note_id = *committed_note.note_id(); + let note_screener = NoteScreener::new(store.clone()); + + if !store.get_input_notes(NoteFilter::Unique(note_id)).await?.is_empty() + || !store.get_output_notes(NoteFilter::Unique(note_id)).await?.is_empty() + { + // The note is being tracked by the client so it is relevant + Ok(true) + } else if let Some(public_note) = public_note { + // The note is not being tracked by the client and is public so we can screen it + let new_note_relevance = note_screener + .check_relevance(&public_note.try_into().expect("Public notes should contain metadata")) + .await?; + + Ok(!new_note_relevance.is_empty()) + } else { + // The note is not being tracked by the client and is private so we can't determine if it + // is relevant + Ok(false) + } +} diff --git a/crates/rust-client/src/sync/state_sync_update.rs b/crates/rust-client/src/sync/state_sync_update.rs index f5f887c6f..7ac455c95 100644 --- a/crates/rust-client/src/sync/state_sync_update.rs +++ b/crates/rust-client/src/sync/state_sync_update.rs @@ -1,44 +1,38 @@ use alloc::vec::Vec; use miden_objects::{ - account::Account, - block::BlockHeader, + account::AccountId, + block::{BlockHeader, BlockNumber}, crypto::merkle::{InOrderIndex, MmrPeaks}, + transaction::TransactionId, Digest, }; -use super::{NoteTagRecord, SyncSummary}; -use crate::{note::NoteUpdates, store::AccountUpdates, transaction::TransactionUpdates}; +use super::SyncSummary; +use crate::{account::Account, note::NoteUpdates, rpc::domain::transaction::TransactionUpdate}; // STATE SYNC UPDATE // ================================================================================================ /// Contains all information needed to apply the update in the store after syncing with the node. +#[derive(Default)] pub struct StateSyncUpdate { - /// The new block header, returned as part of the - /// [`StateSyncInfo`](crate::rpc::domain::sync::StateSyncInfo) - pub block_header: BlockHeader, - /// Whether the block header has notes relevant to the client. - pub block_has_relevant_notes: bool, - /// New MMR peaks for the locally tracked MMR of the blockchain. - pub new_mmr_peaks: MmrPeaks, - /// New authentications nodes that are meant to be stored in order to authenticate block - /// headers. - pub new_authentication_nodes: Vec<(InOrderIndex, Digest)>, + /// The block number of the last block that was synced. + pub block_num: BlockNumber, + /// New blocks and authentication nodes. + pub block_updates: BlockUpdates, /// New and updated notes to be upserted in the store. pub note_updates: NoteUpdates, /// Committed and discarded transactions after the sync. pub transaction_updates: TransactionUpdates, /// Public account updates and mismatched private accounts after the sync. pub account_updates: AccountUpdates, - /// Tag records that are no longer relevant. - pub tags_to_remove: Vec, } impl From<&StateSyncUpdate> for SyncSummary { fn from(value: &StateSyncUpdate) -> Self { SyncSummary::new( - value.block_header.block_num(), + value.block_num, value.note_updates.committed_note_ids().into_iter().collect(), value.note_updates.consumed_note_ids().into_iter().collect(), value @@ -62,3 +56,126 @@ impl From<&StateSyncUpdate> for SyncSummary { ) } } + +/// Contains all the block information that needs to be added in the client's store after a sync. +#[derive(Debug, Clone, Default)] +pub struct BlockUpdates { + /// New block headers to be stored, along with a flag indicating whether the block contains + /// notes that are relevant to the client and the MMR peaks for the block. + block_headers: Vec<(BlockHeader, bool, MmrPeaks)>, + /// New authentication nodes that are meant to be stored in order to authenticate block + /// headers. + new_authentication_nodes: Vec<(InOrderIndex, Digest)>, +} + +impl BlockUpdates { + /// Creates a new instance of [`BlockUpdates`]. + pub fn new( + block_headers: Vec<(BlockHeader, bool, MmrPeaks)>, + new_authentication_nodes: Vec<(InOrderIndex, Digest)>, + ) -> Self { + Self { block_headers, new_authentication_nodes } + } + + /// Returns the new block headers to be stored, along with a flag indicating whether the block + /// contains notes that are relevant to the client and the MMR peaks for the block. + pub fn block_headers(&self) -> &[(BlockHeader, bool, MmrPeaks)] { + &self.block_headers + } + + /// Returns the new authentication nodes that are meant to be stored in order to authenticate + /// block headers. + pub fn new_authentication_nodes(&self) -> &[(InOrderIndex, Digest)] { + &self.new_authentication_nodes + } + + /// Extends the current [`BlockUpdates`] with the provided one. + pub(crate) fn extend(&mut self, other: BlockUpdates) { + self.block_headers.extend(other.block_headers); + self.new_authentication_nodes.extend(other.new_authentication_nodes); + } +} + +/// Contains transaction changes to apply to the store. +#[derive(Default)] +pub struct TransactionUpdates { + /// Transaction updates for any transaction that was committed between the sync request's block + /// number and the response's block number. + committed_transactions: Vec, + /// Transaction IDs for any transactions that were discarded in the sync. + discarded_transactions: Vec, +} + +impl TransactionUpdates { + /// Creates a new [`TransactionUpdate`] + pub fn new( + committed_transactions: Vec, + discarded_transactions: Vec, + ) -> Self { + Self { + committed_transactions, + discarded_transactions, + } + } + + /// Extends the transaction update information with `other`. + pub fn extend(&mut self, other: Self) { + self.committed_transactions.extend(other.committed_transactions); + self.discarded_transactions.extend(other.discarded_transactions); + } + + /// Returns a reference to committed transactions. + pub fn committed_transactions(&self) -> &[TransactionUpdate] { + &self.committed_transactions + } + + /// Returns a reference to discarded transactions. + pub fn discarded_transactions(&self) -> &[TransactionId] { + &self.discarded_transactions + } +} + +// ACCOUNT UPDATES +// ================================================================================================ + +#[derive(Debug, Clone, Default)] +/// Contains account changes to apply to the store after a sync request. +pub struct AccountUpdates { + /// Updated public accounts. + updated_public_accounts: Vec, + /// Account hashes received from the network that don't match the currently locally-tracked + /// state of the private accounts. + /// + /// These updates may represent a stale account hash (meaning that the latest local state + /// hasn't been committed). If this is not the case, the account may be locked until the state + /// is restored manually. + mismatched_private_accounts: Vec<(AccountId, Digest)>, +} + +impl AccountUpdates { + /// Creates a new instance of `AccountUpdates`. + pub fn new( + updated_public_accounts: Vec, + mismatched_private_accounts: Vec<(AccountId, Digest)>, + ) -> Self { + Self { + updated_public_accounts, + mismatched_private_accounts, + } + } + + /// Returns the updated public accounts. + pub fn updated_public_accounts(&self) -> &[Account] { + &self.updated_public_accounts + } + + /// Returns the mismatched private accounts. + pub fn mismatched_private_accounts(&self) -> &[(AccountId, Digest)] { + &self.mismatched_private_accounts + } + + pub fn extend(&mut self, other: AccountUpdates) { + self.updated_public_accounts.extend(other.updated_public_accounts); + self.mismatched_private_accounts.extend(other.mismatched_private_accounts); + } +} diff --git a/crates/rust-client/src/tests.rs b/crates/rust-client/src/tests.rs index 391fb243d..b31a9bc9f 100644 --- a/crates/rust-client/src/tests.rs +++ b/crates/rust-client/src/tests.rs @@ -360,7 +360,7 @@ async fn test_sync_state_mmr() { ); // Try reconstructing the chain_mmr from what's in the database - let partial_mmr = client.build_current_partial_mmr(true).await.unwrap(); + let partial_mmr = client.build_current_partial_mmr().await.unwrap(); assert_eq!(partial_mmr.forest(), 6); assert!(partial_mmr.open(0).unwrap().is_some()); // Account anchor block assert!(partial_mmr.open(1).unwrap().is_some()); diff --git a/crates/rust-client/src/transaction/mod.rs b/crates/rust-client/src/transaction/mod.rs index 9767e4cab..efad02d46 100644 --- a/crates/rust-client/src/transaction/mod.rs +++ b/crates/rust-client/src/transaction/mod.rs @@ -95,7 +95,7 @@ use super::{Client, FeltRng}; use crate::{ account::procedure_roots::RPO_FALCON_512_AUTH, note::{NoteScreener, NoteUpdates}, - rpc::domain::{account::AccountProof, transaction::TransactionUpdate}, + rpc::domain::account::AccountProof, store::{ input_note_states::ExpectedNoteState, InputNoteRecord, InputNoteState, NoteFilter, OutputNoteRecord, StoreError, TransactionFilter, @@ -369,50 +369,6 @@ impl TransactionStoreUpdate { } } -/// Contains transaction changes to apply to the store. -#[derive(Default)] -pub struct TransactionUpdates { - /// Transaction updates for any transaction that was committed between the sync request's block - /// number and the response's block number. - committed_transactions: Vec, - /// Transaction IDs for any transactions that were discarded in the sync. - discarded_transactions: Vec, -} - -impl TransactionUpdates { - /// Creates a new [`TransactionUpdate`] - pub fn new( - committed_transactions: Vec, - discarded_transactions: Vec, - ) -> Self { - Self { - committed_transactions, - discarded_transactions, - } - } - - /// Extends the transaction update information with `other`. - pub fn extend(&mut self, other: Self) { - self.committed_transactions.extend(other.committed_transactions); - self.discarded_transactions.extend(other.discarded_transactions); - } - - /// Returns a reference to committed transactions. - pub fn committed_transactions(&self) -> &[TransactionUpdate] { - &self.committed_transactions - } - - /// Returns a reference to discarded transactions. - pub fn discarded_transactions(&self) -> &[TransactionId] { - &self.discarded_transactions - } - - /// Inserts a discarded transaction into the transaction updates. - pub fn insert_discarded_transaction(&mut self, transaction_id: TransactionId) { - self.discarded_transactions.push(transaction_id); - } -} - /// Transaction management methods impl Client { // TRANSACTION DATA RETRIEVAL @@ -934,7 +890,7 @@ impl Client { let summary = self.sync_state().await?; if summary.block_num != block_num { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.build_current_partial_mmr().await?; self.get_and_store_authenticated_block(block_num, &mut current_partial_mmr) .await?; } diff --git a/tests/integration/custom_transactions_tests.rs b/tests/integration/custom_transactions_tests.rs index 002c8b4d0..63410797f 100644 --- a/tests/integration/custom_transactions_tests.rs +++ b/tests/integration/custom_transactions_tests.rs @@ -1,6 +1,7 @@ use miden_client::{ note::NoteExecutionHint, - transaction::{TransactionRequest, TransactionRequestBuilder}, + store::NoteFilter, + transaction::{InputNote, TransactionRequest, TransactionRequestBuilder}, utils::{Deserializable, Serializable}, ZERO, }; @@ -224,6 +225,83 @@ async fn test_merkle_store() { client.sync_state().await.unwrap(); } +#[tokio::test] +async fn test_onchain_notes_sync_with_tag() { + // Client 1 has an private faucet which will mint an onchain note for client 2 + let (mut client_1, keystore_1) = create_test_client().await; + // Client 2 will be used to sync and check that by adding the tag we can still fetch notes + // whose tag doesn't necessarily match any of its accounts + let (mut client_2, keystore_2) = create_test_client().await; + // Client 3 will be the control client. We won't add any tags and expect the note not to be + // fetched + let (mut client_3, ..) = create_test_client().await; + wait_for_node(&mut client_3).await; + + // Create accounts + let (basic_account_1, ..) = + insert_new_wallet(&mut client_1, AccountStorageMode::Private, &keystore_1) + .await + .unwrap(); + + insert_new_wallet(&mut client_2, AccountStorageMode::Private, &keystore_2) + .await + .unwrap(); + + client_1.sync_state().await.unwrap(); + client_2.sync_state().await.unwrap(); + client_3.sync_state().await.unwrap(); + + // Create the custom note + let note_script = " + begin + push.1 push.1 + assert_eq + end + "; + let note_script = client_1.compile_note_script(note_script).unwrap(); + let inputs = NoteInputs::new(vec![]).unwrap(); + let serial_num = client_1.rng().draw_word(); + let note_metadata = NoteMetadata::new( + basic_account_1.id(), + NoteType::Public, + NoteTag::from_account_id(basic_account_1.id(), NoteExecutionMode::Local).unwrap(), + NoteExecutionHint::None, + Default::default(), + ) + .unwrap(); + let note_assets = NoteAssets::new(vec![]).unwrap(); + let note_recipient = NoteRecipient::new(serial_num, note_script, inputs); + let note = Note::new(note_assets, note_metadata, note_recipient); + + // Send transaction and wait for it to be committed + let tx_request = TransactionRequestBuilder::new() + .with_own_output_notes(vec![OutputNote::Full(note.clone())]) + .build() + .unwrap(); + + let note = tx_request.expected_output_notes().next().unwrap().clone(); + execute_tx_and_sync(&mut client_1, basic_account_1.id(), tx_request).await; + + // Load tag into client 2 + client_2 + .add_note_tag( + NoteTag::from_account_id(basic_account_1.id(), NoteExecutionMode::Local).unwrap(), + ) + .await + .unwrap(); + + // Client 2's account should receive the note here: + client_2.sync_state().await.unwrap(); + client_3.sync_state().await.unwrap(); + + // Assert that the note is the same + let received_note: InputNote = + client_2.get_input_note(note.id()).await.unwrap().unwrap().try_into().unwrap(); + assert_eq!(received_note.note().hash(), note.hash()); + assert_eq!(received_note.note(), ¬e); + assert!(client_3.get_input_notes(NoteFilter::All).await.unwrap().is_empty()); +} + async fn mint_custom_note( client: &mut TestClient, faucet_account_id: AccountId, diff --git a/tests/integration/onchain_tests.rs b/tests/integration/onchain_tests.rs index d0ab57749..a15ae6586 100644 --- a/tests/integration/onchain_tests.rs +++ b/tests/integration/onchain_tests.rs @@ -1,14 +1,13 @@ use miden_client::{ - account::build_wallet_id, auth::AuthSecretKey, authenticator::keystore::KeyStore, store::{InputNoteState, NoteFilter}, transaction::{PaymentTransactionData, TransactionRequestBuilder}, }; use miden_objects::{ - account::{AccountId, AccountStorageMode}, + account::AccountStorageMode, asset::{Asset, FungibleAsset}, - note::{NoteFile, NoteTag, NoteType}, + note::{NoteFile, NoteType}, transaction::InputNote, }; @@ -92,9 +91,34 @@ async fn test_onchain_notes_flow() { .unwrap(); execute_tx_and_sync(&mut client_2, basic_wallet_1.id(), tx_request).await; + // Create a note for client 3 that is already consumed before syncing + let tx_request = TransactionRequestBuilder::pay_to_id( + PaymentTransactionData::new( + vec![p2id_asset.into()], + basic_wallet_1.id(), + basic_wallet_2.id(), + ), + Some(1.into()), + NoteType::Public, + client_2.rng(), + ) + .unwrap() + .build() + .unwrap(); + let note = tx_request.expected_output_notes().next().unwrap().clone(); + execute_tx_and_sync(&mut client_2, basic_wallet_1.id(), tx_request).await; + + let tx_request = TransactionRequestBuilder::consume_notes(vec![note.id()]).build().unwrap(); + execute_tx_and_sync(&mut client_2, basic_wallet_1.id(), tx_request).await; + // sync client 3 (basic account 2) client_3.sync_state().await.unwrap(); - // client 3 should only have one note + + // client 3 should have two notes, the one directed to them and the one consumed by client 2 + // (which should come from the tag added) + assert_eq!(client_3.get_input_notes(NoteFilter::Committed).await.unwrap().len(), 1); + assert_eq!(client_3.get_input_notes(NoteFilter::Consumed).await.unwrap().len(), 1); + let note = client_3 .get_input_notes(NoteFilter::Committed) .await @@ -289,131 +313,3 @@ async fn test_onchain_accounts() { assert_eq!(new_from_account_balance, from_account_balance - TRANSFER_AMOUNT); assert_eq!(new_to_account_balance, to_account_balance + TRANSFER_AMOUNT); } - -#[tokio::test] -async fn test_onchain_notes_sync_with_tag() { - // Client 1 has an private faucet which will mint an onchain note for client 2 - let (mut client_1, keystore) = create_test_client().await; - // Client 2 will be used to sync and check that by adding the tag we can still fetch notes - // whose tag doesn't necessarily match any of its accounts - let (mut client_2, _) = create_test_client().await; - // Client 3 will be the control client. We won't add any tags and expect the note not to be - // fetched - let (mut client_3, _) = create_test_client().await; - wait_for_node(&mut client_3).await; - - // Create faucet account - let (faucet_account, ..) = - insert_new_fungible_faucet(&mut client_1, AccountStorageMode::Private, &keystore) - .await - .unwrap(); - - client_1.sync_state().await.unwrap(); - client_2.sync_state().await.unwrap(); - client_3.sync_state().await.unwrap(); - - let target_account_id = AccountId::try_from(ACCOUNT_ID_REGULAR).unwrap(); - - let tx_request = TransactionRequestBuilder::mint_fungible_asset( - FungibleAsset::new(faucet_account.id(), MINT_AMOUNT).unwrap(), - target_account_id, - NoteType::Public, - client_1.rng(), - ) - .unwrap() - .build() - .unwrap(); - let note = tx_request.expected_output_notes().next().unwrap().clone(); - execute_tx_and_sync(&mut client_1, faucet_account.id(), tx_request).await; - - // Load tag into client 2 - client_2 - .add_note_tag( - NoteTag::from_account_id( - target_account_id, - miden_objects::note::NoteExecutionMode::Local, - ) - .unwrap(), - ) - .await - .unwrap(); - - // Client 2's account should receive the note here: - client_2.sync_state().await.unwrap(); - client_3.sync_state().await.unwrap(); - - // Assert that the note is the same - let received_note: InputNote = - client_2.get_input_note(note.id()).await.unwrap().unwrap().try_into().unwrap(); - assert_eq!(received_note.note().hash(), note.hash()); - assert_eq!(received_note.note(), ¬e); - assert!(client_3.get_input_notes(NoteFilter::All).await.unwrap().is_empty()); -} - -#[tokio::test] -async fn test_import_account_by_id() { - let (mut client_1, keystore_1) = create_test_client().await; - let (mut client_2, keystore_2) = create_test_client().await; - wait_for_node(&mut client_1).await; - - let mut user_seed = [0u8; 32]; - client_1.rng().fill_bytes(&mut user_seed); - - let (faucet_account_header, ..) = - insert_new_fungible_faucet(&mut client_1, AccountStorageMode::Public, &keystore_1) - .await - .unwrap(); - - let (first_regular_account, _, secret_key) = insert_new_wallet_with_seed( - &mut client_1, - AccountStorageMode::Public, - &keystore_1, - user_seed, - ) - .await - .unwrap(); - - let target_account_id = first_regular_account.id(); - let faucet_account_id = faucet_account_header.id(); - - // First mint and consume in the first client - println!("First client consuming note"); - let note = - mint_note(&mut client_1, target_account_id, faucet_account_id, NoteType::Public).await; - - consume_notes(&mut client_1, target_account_id, &[note]).await; - - // Mint a note for the second client - let note = - mint_note(&mut client_1, target_account_id, faucet_account_id, NoteType::Public).await; - - // Import the public account by id - let anchor_block = client_1.get_latest_epoch_block().await.unwrap(); - let built_wallet_id = build_wallet_id( - user_seed, - secret_key.public_key(), - AccountStorageMode::Public, - false, - &anchor_block, - ) - .unwrap(); - assert_eq!(built_wallet_id, first_regular_account.id()); - client_2.import_account_by_id(built_wallet_id).await.unwrap(); - keystore_2.add_key(&AuthSecretKey::RpoFalcon512(secret_key)).unwrap(); - - let original_account = client_1.get_account(first_regular_account.id()).await.unwrap().unwrap(); - let imported_account = client_2.get_account(first_regular_account.id()).await.unwrap().unwrap(); - assert_eq!(imported_account.account().hash(), original_account.account().hash()); - - // Now use the wallet in the second client to consume the generated note - println!("Second client consuming note"); - client_2.sync_state().await.unwrap(); - consume_notes(&mut client_2, target_account_id, &[note]).await; - assert_account_has_single_asset( - &client_2, - target_account_id, - faucet_account_id, - MINT_AMOUNT * 2, - ) - .await; -}