From 993149590969ee12a07fca2888e7d124ef911375 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Wed, 31 Jan 2024 09:19:56 +0200 Subject: [PATCH] high-level sync logic --- base_layer/common_types/src/chain_metadata.rs | 2 +- .../states/horizon_state_sync.rs | 37 +++------- .../states/sync_decide.rs | 74 ++++++++++--------- .../sync/horizon_state_sync/synchronizer.rs | 45 ++++++----- base_layer/core/tests/tests/horizon_sync.rs | 25 ++++--- 5 files changed, 93 insertions(+), 90 deletions(-) diff --git a/base_layer/common_types/src/chain_metadata.rs b/base_layer/common_types/src/chain_metadata.rs index 200aae22a6b..b597d87a599 100644 --- a/base_layer/common_types/src/chain_metadata.rs +++ b/base_layer/common_types/src/chain_metadata.rs @@ -73,7 +73,7 @@ impl ChainMetadata { pub fn pruned_height_at_given_chain_tip(&self, chain_tip: u64) -> u64 { match self.pruning_horizon { 0 => 0, - horizon => chain_tip.saturating_sub(horizon), + pruning_horizon => chain_tip.saturating_sub(pruning_horizon), } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs index ef79a01a232..e5a1dde640c 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs @@ -58,42 +58,29 @@ impl HorizonStateSync { }; let sync_peers = &mut self.sync_peers; - // // Order sync peers according to accumulated difficulty + // Order sync peers according to accumulated difficulty sync_peers.sort_by(|a, b| { b.claimed_chain_metadata() .accumulated_difficulty() .cmp(&a.claimed_chain_metadata().accumulated_difficulty()) }); - let remote_metadata = match sync_peers.first() { - Some(peer) => peer.claimed_chain_metadata(), - None => return StateEvent::FatalError("No sync peers".into()), - }; - let horizon_sync_height = - local_metadata.pruned_height_at_given_chain_tip(remote_metadata.height_of_longest_chain()); + // Target horizon sync height based on the last header we have synced + let last_header = match shared.db.fetch_last_header().await { + Ok(h) => h, + Err(err) => return err.into(), + }; + let target_horizon_sync_height = local_metadata.pruned_height_at_given_chain_tip(last_header.height); // Determine if we need to sync horizon state - if local_metadata.pruned_height() >= horizon_sync_height { - info!(target: LOG_TARGET, "Horizon state was already synchronized."); + if local_metadata.pruned_height() >= target_horizon_sync_height { + info!(target: LOG_TARGET, "Horizon state is already synchronized."); return StateEvent::HorizonStateSynchronized; } - match shared.db.fetch_last_header().await { - Ok(header) => { - if header.height < horizon_sync_height { - info!( - target: LOG_TARGET, - "Horizon state sync height is higher than our header tip height. \ - Waiting for header sync to catch up." - ); - return StateEvent::HorizonStateSynchronized; - } - }, - Err(err) => return err.into(), - } - if local_metadata.height_of_longest_chain() >= horizon_sync_height { + if local_metadata.height_of_longest_chain() >= target_horizon_sync_height { info!( target: LOG_TARGET, - "Tip height is higher than our pruned height. Horizon state is already synchronized." + "Our tip height is higher than our target pruned height. Horizon state is already synchronized." ); return StateEvent::HorizonStateSynchronized; } @@ -110,7 +97,7 @@ impl HorizonStateSync { connectivity, rules, sync_peers, - horizon_sync_height, + target_horizon_sync_height, prover, validator, ); diff --git a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs index 995fb96b2fc..be62feed696 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs @@ -64,17 +64,21 @@ impl DecideNextSync { if local_metadata.pruning_horizon() > 0 { // Filter sync peers that claim to be able to provide blocks up until our pruned height debug!(target: LOG_TARGET, "Local metadata: {}", local_metadata); - let sync_peers = self - .sync_peers + let mut sync_peers = self.sync_peers.clone(); + let sync_peers = sync_peers .drain(..) .filter(|sync_peer| { let remote_metadata = sync_peer.claimed_chain_metadata(); debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata); let remote_is_archival_node = remote_metadata.pruned_height() == 0; let general_sync_conditions = + // Must be able to provide the correct amount of full blocks past the pruned height (i.e. the + // pruning horizon), otherwise our horizon spec will not be met + remote_metadata.height_of_longest_chain().saturating_sub(remote_metadata.pruned_height()) >= + local_metadata.pruning_horizon() && // Must have a better blockchain tip than us remote_metadata.height_of_longest_chain() > local_metadata.height_of_longest_chain() && - // Must be able to provide full blocks past the pruned height (within the pruning horizon) + // Must be able to provide full blocks from the height we need detailed information remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain(); let sync_from_prune_node = !remote_is_archival_node && // Must have done initial sync (to detect genesis TXO spends) @@ -86,45 +90,43 @@ impl DecideNextSync { if sync_peers.is_empty() { warn!( target: LOG_TARGET, - "Unable to find any appropriate sync peers for horizon sync" + "Unable to find any appropriate sync peers for horizon sync, trying for block sync" ); - return Continue; - } - - debug!( - target: LOG_TARGET, - "Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}", - sync_peers.len(), - sync_peers.first().map(|p| p.latency()).unwrap_or_default() - ); - ProceedToHorizonSync(sync_peers) - } else { - // Filter sync peers that are able to provide full blocks from our current tip - let sync_peers = self - .sync_peers - .drain(..) - .filter(|sync_peer| { - let remote_metadata = sync_peer.claimed_chain_metadata(); - remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain() - }) - .collect::>(); - - if sync_peers.is_empty() { - warn!( + } else { + debug!( target: LOG_TARGET, - "Unable to find any appropriate sync peers for block sync" + "Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}", + sync_peers.len(), + sync_peers.first().map(|p| p.latency()).unwrap_or_default() ); - return Continue; + return ProceedToHorizonSync(sync_peers); } + } + + // This is not a pruned node or horizon sync is not possible, try for block sync + + // Filter sync peers that are able to provide full blocks from our current tip + let sync_peers = self + .sync_peers + .drain(..) + .filter(|sync_peer| { + let remote_metadata = sync_peer.claimed_chain_metadata(); + remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain() + }) + .collect::>(); - debug!( - target: LOG_TARGET, - "Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}", - sync_peers.len(), - sync_peers.first().map(|p| p.latency()).unwrap_or_default() - ); - ProceedToBlockSync(sync_peers) + if sync_peers.is_empty() { + warn!(target: LOG_TARGET, "Unable to find any appropriate sync peers for block sync"); + return Continue; } + + debug!( + target: LOG_TARGET, + "Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}", + sync_peers.len(), + sync_peers.first().map(|p| p.latency()).unwrap_or_default() + ); + ProceedToBlockSync(sync_peers) } } diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 49091226275..57caacfd6c4 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -43,6 +43,7 @@ use crate::{ hooks::Hooks, horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus}, rpc, + rpc::BaseNodeSyncRpcClient, BlockchainSyncConfig, SyncPeer, }, @@ -217,6 +218,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { node_id: &NodeId, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { + // Connect + let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?; + + // Perform horizon sync + debug!(target: LOG_TARGET, "Check if pruning is needed"); + self.prune_if_needed().await?; + self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header) + .await?; + + // Validate and finalize horizon sync + self.finalize_horizon_sync(&sync_peer).await?; + + Ok(()) + } + + async fn connect_sync_peer( + &mut self, + node_id: &NodeId, + ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> { let peer_index = self .get_sync_peer_index(node_id) .ok_or(HorizonSyncError::PeerNotFound)?; @@ -248,14 +268,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { max_latency: self.max_latency, }); } - debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency); - let sync_peer = self.sync_peers[peer_index].clone(); - - self.begin_sync(sync_peer.clone(), &mut client, to_header).await?; - self.finalize_horizon_sync(&sync_peer).await?; - Ok(()) + Ok((client, self.sync_peers[peer_index].clone())) } async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { @@ -271,18 +286,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(conn) } - async fn begin_sync( + async fn sync_kernels_and_outputs( &mut self, sync_peer: SyncPeer, client: &mut rpc::BaseNodeSyncRpcClient, to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { - debug!(target: LOG_TARGET, "Initializing"); - self.initialize().await?; - + // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into + // the database. Furthermore, these kernels will also be successfully removed when we need to rewind + // the blockchain for whatever reason. debug!(target: LOG_TARGET, "Synchronizing kernels"); - // We do not need to rewind kernels if the sync fails due to it being validated when inserted into the database - // and it will also be removed when rewinding the blockchain without any issues self.synchronize_kernels(sync_peer.clone(), client, to_header).await?; debug!(target: LOG_TARGET, "Synchronizing outputs"); match self.synchronize_outputs(sync_peer, client, to_header).await { @@ -361,14 +374,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } } - async fn initialize(&mut self) -> Result<(), HorizonSyncError> { - let db = self.db(); - let local_metadata = db.get_chain_metadata().await?; - + async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> { + let local_metadata = self.db.get_chain_metadata().await?; let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height); if local_metadata.pruned_height() < new_prune_height { debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height); - db.prune_to_height(new_prune_height).await?; + self.db.prune_to_height(new_prune_height).await?; } Ok(()) diff --git a/base_layer/core/tests/tests/horizon_sync.rs b/base_layer/core/tests/tests/horizon_sync.rs index 2f3e819819a..df83120a60b 100644 --- a/base_layer/core/tests/tests/horizon_sync.rs +++ b/base_layer/core/tests/tests/horizon_sync.rs @@ -197,7 +197,7 @@ async fn test_horizon_sync_from_archival_node_happy_path() { // 7. Alice attempts horizon sync to the new pruning height (to height 20 - STXOs should be pruned) Outputs created // after height 10 and spent up to height 20 with corresponding inputs should not be streamed; we do not have way - // to verify this except= looking at the detail log files. + // to verify this except looking at the detail log files. println!("\n7. Alice attempts horizon sync to the new pruning height (to height 20 - STXOs should be pruned)\n"); let spent_coinbases = coinbases .iter() @@ -399,8 +399,9 @@ async fn test_horizon_sync_from_prune_node_happy_path() { println!("Event: {} to header {}", state_event(&event), carol_header_height); assert_eq!(carol_header_height, 8); let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_bob).await; - if let StateEvent::ProceedToHorizonSync(_) = event { - panic!("2. Carol should not proceed to horizon sync") + match event { + StateEvent::ProceedToBlockSync(_) => println!("Carol chose `ProceedToBlockSync` instead"), + _ => panic!("2. Carol should not choose '{:?}'", event), } // Give Bob some more blocks @@ -470,8 +471,9 @@ async fn test_horizon_sync_from_prune_node_happy_path() { println!("Event: {} to header {}", state_event(&event), carol_header_height); assert_eq!(carol_header_height, 13); let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_alice).await; - if let StateEvent::ProceedToHorizonSync(_) = event { - panic!("5 Carol should not proceed to horizon sync") + match event { + StateEvent::Continue => println!("Carol chose `Continue` instead"), + _ => panic!("5. Carol should not choose '{:?}'", event), } // Alice will not be banned assert!(!sync::wait_for_is_peer_banned(&carol_node, alice_node.node_identity.node_id(), 1).await); @@ -559,8 +561,8 @@ async fn test_horizon_sync_from_prune_node_happy_path() { // Bob will not be banned assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); - // 9. Carol attempts horizon sync from Alice (to height 6) - println!("\n9. Carol attempts horizon sync from Alice (to height 6)\n"); + // 9. Carol attempts horizon sync from Alice with inadequate pruning horizon (to height 6) + println!("\n9. Carol attempts horizon sync from Alice with inadequate pruning horizon (to height 6)\n"); let mut header_sync_carol_from_alice = sync::initialize_sync_headers_with_ping_pong_data(&carol_node, &alice_node); let event = sync::sync_headers_execute(&mut carol_state_machine, &mut header_sync_carol_from_alice).await; @@ -568,8 +570,9 @@ async fn test_horizon_sync_from_prune_node_happy_path() { println!("Event: {} to header {}", state_event(&event), carol_header_height); assert_eq!(carol_header_height, 18); let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_alice).await; - if let StateEvent::ProceedToHorizonSync(_) = event { - panic!("9. Carol should not proceed to horizon sync") + match event { + StateEvent::Continue => println!("Carol chose `Continue` instead"), + _ => panic!("9. Carol should not choose '{:?}'", event), } // Alice will not be banned assert!(!sync::wait_for_is_peer_banned(&carol_node, alice_node.node_identity.node_id(), 1).await); @@ -629,8 +632,8 @@ async fn test_horizon_sync_from_prune_node_happy_path() { // Bob will not be banned assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await); - // 12. Alice attempts horizon sync from Carol (to height 18) - println!("\n12. Alice attempts horizon sync from Carol (to height 18)\n"); + // 12. Alice attempts horizon sync from Carol with adequate pruning horizon (to height 18) + println!("\n12. Alice attempts horizon sync from Carol with adequate pruning horizon (to height 18)\n"); let mut header_sync_alice_from_carol = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &carol_node); let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync_alice_from_carol).await;