Skip to content

Commit

Permalink
high-level sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Jan 31, 2024
1 parent 18355f4 commit 9931495
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 90 deletions.
2 changes: 1 addition & 1 deletion base_layer/common_types/src/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ChainMetadata {
pub fn pruned_height_at_given_chain_tip(&self, chain_tip: u64) -> u64 {
match self.pruning_horizon {
0 => 0,
horizon => chain_tip.saturating_sub(horizon),
pruning_horizon => chain_tip.saturating_sub(pruning_horizon),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,42 +58,29 @@ impl HorizonStateSync {
};

let sync_peers = &mut self.sync_peers;
// // Order sync peers according to accumulated difficulty
// Order sync peers according to accumulated difficulty
sync_peers.sort_by(|a, b| {
b.claimed_chain_metadata()
.accumulated_difficulty()
.cmp(&a.claimed_chain_metadata().accumulated_difficulty())
});
let remote_metadata = match sync_peers.first() {
Some(peer) => peer.claimed_chain_metadata(),
None => return StateEvent::FatalError("No sync peers".into()),
};

let horizon_sync_height =
local_metadata.pruned_height_at_given_chain_tip(remote_metadata.height_of_longest_chain());
// Target horizon sync height based on the last header we have synced
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};
let target_horizon_sync_height = local_metadata.pruned_height_at_given_chain_tip(last_header.height);

// Determine if we need to sync horizon state
if local_metadata.pruned_height() >= horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state was already synchronized.");
if local_metadata.pruned_height() >= target_horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state is already synchronized.");
return StateEvent::HorizonStateSynchronized;
}
match shared.db.fetch_last_header().await {
Ok(header) => {
if header.height < horizon_sync_height {
info!(
target: LOG_TARGET,
"Horizon state sync height is higher than our header tip height. \
Waiting for header sync to catch up."
);
return StateEvent::HorizonStateSynchronized;
}
},
Err(err) => return err.into(),
}
if local_metadata.height_of_longest_chain() >= horizon_sync_height {
if local_metadata.height_of_longest_chain() >= target_horizon_sync_height {
info!(
target: LOG_TARGET,
"Tip height is higher than our pruned height. Horizon state is already synchronized."
"Our tip height is higher than our target pruned height. Horizon state is already synchronized."
);
return StateEvent::HorizonStateSynchronized;
}
Expand All @@ -110,7 +97,7 @@ impl HorizonStateSync {
connectivity,
rules,
sync_peers,
horizon_sync_height,
target_horizon_sync_height,
prover,
validator,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,21 @@ impl DecideNextSync {
if local_metadata.pruning_horizon() > 0 {
// Filter sync peers that claim to be able to provide blocks up until our pruned height
debug!(target: LOG_TARGET, "Local metadata: {}", local_metadata);
let sync_peers = self
.sync_peers
let mut sync_peers = self.sync_peers.clone();
let sync_peers = sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata);
let remote_is_archival_node = remote_metadata.pruned_height() == 0;
let general_sync_conditions =
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.height_of_longest_chain().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.height_of_longest_chain() > local_metadata.height_of_longest_chain() &&
// Must be able to provide full blocks past the pruned height (within the pruning horizon)
// Must be able to provide full blocks from the height we need detailed information
remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain();
let sync_from_prune_node = !remote_is_archival_node &&
// Must have done initial sync (to detect genesis TXO spends)
Expand All @@ -86,45 +90,43 @@ impl DecideNextSync {
if sync_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for horizon sync"
"Unable to find any appropriate sync peers for horizon sync, trying for block sync"
);
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToHorizonSync(sync_peers)
} else {
// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain()
})
.collect::<Vec<_>>();

if sync_peers.is_empty() {
warn!(
} else {
debug!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for block sync"
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
return Continue;
return ProceedToHorizonSync(sync_peers);
}
}

// This is not a pruned node or horizon sync is not possible, try for block sync

// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.pruned_height() <= local_metadata.height_of_longest_chain()
})
.collect::<Vec<_>>();

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
if sync_peers.is_empty() {
warn!(target: LOG_TARGET, "Unable to find any appropriate sync peers for block sync");
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
hooks::Hooks,
horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
rpc,
rpc::BaseNodeSyncRpcClient,
BlockchainSyncConfig,
SyncPeer,
},
Expand Down Expand Up @@ -217,6 +218,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
node_id: &NodeId,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
// Connect
let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;

// Perform horizon sync
debug!(target: LOG_TARGET, "Check if pruning is needed");
self.prune_if_needed().await?;
self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
.await?;

// Validate and finalize horizon sync
self.finalize_horizon_sync(&sync_peer).await?;

Ok(())
}

async fn connect_sync_peer(
&mut self,
node_id: &NodeId,
) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
let peer_index = self
.get_sync_peer_index(node_id)
.ok_or(HorizonSyncError::PeerNotFound)?;
Expand Down Expand Up @@ -248,14 +268,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
max_latency: self.max_latency,
});
}

debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
let sync_peer = self.sync_peers[peer_index].clone();

self.begin_sync(sync_peer.clone(), &mut client, to_header).await?;
self.finalize_horizon_sync(&sync_peer).await?;

Ok(())
Ok((client, self.sync_peers[peer_index].clone()))
}

async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
Expand All @@ -271,18 +286,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Ok(conn)
}

async fn begin_sync(
async fn sync_kernels_and_outputs(
&mut self,
sync_peer: SyncPeer,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
debug!(target: LOG_TARGET, "Initializing");
self.initialize().await?;

// Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
// the database. Furthermore, these kernels will also be successfully removed when we need to rewind
// the blockchain for whatever reason.
debug!(target: LOG_TARGET, "Synchronizing kernels");
// We do not need to rewind kernels if the sync fails due to it being validated when inserted into the database
// and it will also be removed when rewinding the blockchain without any issues
self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
match self.synchronize_outputs(sync_peer, client, to_header).await {
Expand Down Expand Up @@ -361,14 +374,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
}

async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
let db = self.db();
let local_metadata = db.get_chain_metadata().await?;

async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
let local_metadata = self.db.get_chain_metadata().await?;
let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height);
if local_metadata.pruned_height() < new_prune_height {
debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
db.prune_to_height(new_prune_height).await?;
self.db.prune_to_height(new_prune_height).await?;
}

Ok(())
Expand Down
25 changes: 14 additions & 11 deletions base_layer/core/tests/tests/horizon_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async fn test_horizon_sync_from_archival_node_happy_path() {

// 7. Alice attempts horizon sync to the new pruning height (to height 20 - STXOs should be pruned) Outputs created
// after height 10 and spent up to height 20 with corresponding inputs should not be streamed; we do not have way
// to verify this except= looking at the detail log files.
// to verify this except looking at the detail log files.
println!("\n7. Alice attempts horizon sync to the new pruning height (to height 20 - STXOs should be pruned)\n");
let spent_coinbases = coinbases
.iter()
Expand Down Expand Up @@ -399,8 +399,9 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
println!("Event: {} to header {}", state_event(&event), carol_header_height);
assert_eq!(carol_header_height, 8);
let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_bob).await;
if let StateEvent::ProceedToHorizonSync(_) = event {
panic!("2. Carol should not proceed to horizon sync")
match event {
StateEvent::ProceedToBlockSync(_) => println!("Carol chose `ProceedToBlockSync` instead"),
_ => panic!("2. Carol should not choose '{:?}'", event),
}

// Give Bob some more blocks
Expand Down Expand Up @@ -470,8 +471,9 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
println!("Event: {} to header {}", state_event(&event), carol_header_height);
assert_eq!(carol_header_height, 13);
let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_alice).await;
if let StateEvent::ProceedToHorizonSync(_) = event {
panic!("5 Carol should not proceed to horizon sync")
match event {
StateEvent::Continue => println!("Carol chose `Continue` instead"),
_ => panic!("5. Carol should not choose '{:?}'", event),
}
// Alice will not be banned
assert!(!sync::wait_for_is_peer_banned(&carol_node, alice_node.node_identity.node_id(), 1).await);
Expand Down Expand Up @@ -559,17 +561,18 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);

// 9. Carol attempts horizon sync from Alice (to height 6)
println!("\n9. Carol attempts horizon sync from Alice (to height 6)\n");
// 9. Carol attempts horizon sync from Alice with inadequate pruning horizon (to height 6)
println!("\n9. Carol attempts horizon sync from Alice with inadequate pruning horizon (to height 6)\n");

let mut header_sync_carol_from_alice = sync::initialize_sync_headers_with_ping_pong_data(&carol_node, &alice_node);
let event = sync::sync_headers_execute(&mut carol_state_machine, &mut header_sync_carol_from_alice).await;
let carol_header_height = carol_node.blockchain_db.fetch_last_header().unwrap().height;
println!("Event: {} to header {}", state_event(&event), carol_header_height);
assert_eq!(carol_header_height, 18);
let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_alice).await;
if let StateEvent::ProceedToHorizonSync(_) = event {
panic!("9. Carol should not proceed to horizon sync")
match event {
StateEvent::Continue => println!("Carol chose `Continue` instead"),
_ => panic!("9. Carol should not choose '{:?}'", event),
}
// Alice will not be banned
assert!(!sync::wait_for_is_peer_banned(&carol_node, alice_node.node_identity.node_id(), 1).await);
Expand Down Expand Up @@ -629,8 +632,8 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await);

// 12. Alice attempts horizon sync from Carol (to height 18)
println!("\n12. Alice attempts horizon sync from Carol (to height 18)\n");
// 12. Alice attempts horizon sync from Carol with adequate pruning horizon (to height 18)
println!("\n12. Alice attempts horizon sync from Carol with adequate pruning horizon (to height 18)\n");

let mut header_sync_alice_from_carol = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &carol_node);
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync_alice_from_carol).await;
Expand Down

0 comments on commit 9931495

Please sign in to comment.