Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Apr 19, 2024
1 parent b8ad59d commit 2de8c9f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 53 deletions.
77 changes: 68 additions & 9 deletions comms/core/src/net_address/multiaddr_with_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ impl MultiaddrWithStats {
self.update_quality_score();
}

#[cfg(test)]
fn get_averag_latency(&self) -> Option<Duration> {
self.avg_latency
}

pub fn update_initial_dial_time(&mut self, initial_dial_time: Duration) {
self.last_seen = Some(Utc::now().naive_utc());

Expand Down Expand Up @@ -206,33 +211,52 @@ impl MultiaddrWithStats {
self.clone().address
}

// The quality score is a measure of the reliability of the net address. It is calculated based on the following:
// - The maximum score is 'Some(1000)' points (seen within the last 1s and latency < 100ms).
// - The minimum score without any connection errors is 'Some(100)' points (seen >= 800s ago and latency >= 10s).
// - For any sort of connection error the score is 'Some(0)' points.
// - A score of `None` means it has not been tried.
fn calculate_quality_score(&self) -> Option<i32> {
// If we have never seen or attempted the peer, we start with a high score to ensure that
if self.last_seen.is_none() && self.last_attempted.is_none() {
return None;
}

let mut score_self = 0;
// The starting score
let mut score_self = 800;

// Latency score:
// - If there is no average yet, add '100' points
// - If the average latency is
// - less than 100ms, add '100' points
// - 100ms to 10,000ms', add '99' to '1' point on a sliding scale
// - 10s or more, add '0' points
if let Some(val) = self.avg_latency {
// explicitly truncate the latency to avoid casting problems
// Explicitly truncate the latency to avoid casting problems
let avg_latency_millis = i32::try_from(val.as_millis()).unwrap_or(i32::MAX);
score_self += cmp::max(0, 100i32.saturating_sub(avg_latency_millis / 100));
} else {
score_self += 100;
}

// Last seen score:
// - If the last seen time is:
// - 800s or more, subtract '700' points
// - 799s to 101s, subtract '699' to '1' point on a sliding scale
// - 100s, add or subtract nothing
// - 99s to 1s, add '1' to '99' points on a sliding scale
// - less than 1s, add '100' points
let last_seen_seconds: i32 = self
.last_seen
.map(|x| Utc::now().naive_utc() - x)
.map(|x| x.num_seconds())
.unwrap_or(i64::MAX / 2)
.try_into()
.unwrap_or(i32::MAX);
score_self += cmp::max(0, 100i32.saturating_sub(last_seen_seconds));
score_self += cmp::max(-700, 100i32.saturating_sub(last_seen_seconds));

// Any failure to connect results in a score of '0' points
if self.last_failed_reason.is_some() {
score_self -= 100;
score_self = 0;
}

Some(score_self)
Expand Down Expand Up @@ -445,13 +469,48 @@ mod test {

#[test]
fn test_calculate_quality_score() {
let address = "/ip4/123.0.0.123/tcp/8000".parse().unwrap();
let mut address = MultiaddrWithStats::new(address, PeerAddressSource::Config);
let address_raw: Multiaddr = "/ip4/123.0.0.123/tcp/8000".parse().unwrap();
let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
assert_eq!(address.quality_score, None);

address.mark_last_seen_now();
assert!(address.quality_score.unwrap() > 100);
assert!(address.quality_score.unwrap() >= 990); // 1000 with a margin of 10s (10) delayed last seen

let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
address.update_latency(Duration::from_millis(1000));
assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(1000));
assert!(address.quality_score.unwrap() >= 980); // 990 with a margin of 10s (10) delayed last seen

let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
address.update_latency(Duration::from_millis(1500));
address.update_latency(Duration::from_millis(2500));
address.update_latency(Duration::from_millis(3500));
assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(2500));
assert!(address.quality_score.unwrap() >= 965); // 975 with a margin of 10s (10) delayed last seen

let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
address.update_latency(Duration::from_millis(3500));
address.update_latency(Duration::from_millis(4500));
address.update_latency(Duration::from_millis(5500));
assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(4500));
assert!(address.quality_score.unwrap() >= 945); // 955 with a margin of 10s (10) delayed last seen

let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
address.update_latency(Duration::from_millis(5500));
address.update_latency(Duration::from_millis(6500));
address.update_latency(Duration::from_millis(7500));
assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(6500));
assert!(address.quality_score.unwrap() >= 925); // 935 with a margin of 10s (10) delayed last seen

let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config);
address.update_latency(Duration::from_millis(9000));
address.update_latency(Duration::from_millis(10000));
address.update_latency(Duration::from_millis(11000));
assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(10000));
assert!(address.quality_score.unwrap() >= 890); // 900 with a margin of 10s (10) delayed last seen

address.mark_failed_connection_attempt("Testing".to_string());
assert!(address.quality_score.unwrap() <= 100);
assert_eq!(address.quality_score.unwrap(), 0);

let another_addr = "/ip4/1.0.0.1/tcp/8000".parse().unwrap();
let another_addr = MultiaddrWithStats::new(another_addr, PeerAddressSource::Config);
Expand Down
5 changes: 0 additions & 5 deletions comms/core/src/net_address/mutliaddresses_with_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,6 @@ mod test {
&PeerAddressSource::Config,
);
net_addresses.merge(&address_12);
println!("net_address12: {:?}", address_12.addresses[0]);
println!(
"net_address1: {:?}",
net_addresses.find_address_mut(&net_address1).unwrap()
);
assert!(net_addresses.contains(&net_address1));
assert!(!net_addresses.contains(&net_address12));
}
Expand Down
10 changes: 3 additions & 7 deletions comms/dht/src/network_discovery/initializing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,11 @@ const LOG_TARGET: &str = "comms::dht::network_discovery";
#[derive(Debug)]
pub(super) struct Initializing<'a> {
context: &'a mut NetworkDiscoveryContext,
initial_peer_sync_delay: Option<Duration>,
}

impl<'a> Initializing<'a> {
pub fn new(context: &'a mut NetworkDiscoveryContext, initial_peer_sync_delay: Option<Duration>) -> Self {
Self {
context,
initial_peer_sync_delay,
}
pub fn new(context: &'a mut NetworkDiscoveryContext) -> Self {
Self { context }
}

pub async fn next_event(&mut self) -> StateEvent {
Expand All @@ -59,7 +55,7 @@ impl<'a> Initializing<'a> {

// Initial discovery and refresh sync peers delay period, when a configured connection needs preference,
// usually needed for the wallet to connect to its own base node first.
if let Some(delay) = self.initial_peer_sync_delay {
if let Some(delay) = self.context.config.network_discovery.initial_peer_sync_delay {
tokio::time::sleep(delay).await;
debug!(target: LOG_TARGET, "Discovery starting after delayed for {:.0?}", delay);
}
Expand Down
7 changes: 1 addition & 6 deletions comms/dht/src/network_discovery/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,7 @@ impl DhtNetworkDiscovery {
async fn get_next_event(&mut self, state: &mut State) -> StateEvent {
use State::{Discovering, Initializing, OnConnect, Ready, Waiting};
match state {
Initializing => {
let initial_peer_sync_delay = self.config().network_discovery.initial_peer_sync_delay;
self::Initializing::new(&mut self.context, initial_peer_sync_delay)
.next_event()
.await
},
Initializing => self::Initializing::new(&mut self.context).next_event().await,
Ready(ready) => ready.next_event().await,
Discovering(discovering) => discovering.next_event().await,
OnConnect(on_connect) => on_connect.next_event().await,
Expand Down
11 changes: 5 additions & 6 deletions comms/dht/src/peer_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ impl<'a> PeerValidator<'a> {
if let Some(existing) = &existing_peer {
if existing.public_key != new_peer.public_key {
return Err(DhtPeerValidatorError::NewAndExistingMismatch {
existing: format!("'{}' / '{}'", existing.node_id.to_hex(), existing.public_key.to_hex()),
existing: format!("BUG: '{}' / '{}'", existing.node_id, existing.public_key),
new: format!(
"'{}' / '{}'",
"BUG: '{}' / '{}'",
NodeId::from_public_key(&new_peer.public_key),
new_peer.public_key.to_hex()
new_peer.public_key
),
});
}
Expand All @@ -92,12 +92,11 @@ impl<'a> PeerValidator<'a> {
let most_recent_claim = find_most_recent_claim(&new_peer.claims).expect("new_peer.claims is not empty");

let node_id = NodeId::from_public_key(&new_peer.public_key);
let node_id_hex = node_id.to_hex();

let mut peer = existing_peer.unwrap_or_else(|| {
Peer::new(
new_peer.public_key.clone(),
node_id,
node_id.clone(),
MultiaddressesWithStats::default(),
PeerFlags::default(),
most_recent_claim.features,
Expand All @@ -118,7 +117,7 @@ impl<'a> PeerValidator<'a> {
trace!(
target: LOG_TARGET,
"Peer '{}' / '{}' added with address(es) from claim: {:?}",
node_id_hex,
node_id,
new_peer.public_key.to_hex(),
claim.addresses
);
Expand Down
22 changes: 2 additions & 20 deletions comms/dht/src/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tari_comms::{
utils,
PeerManager,
};
use tari_utilities::{hex::Hex, ByteArray};
use tari_utilities::ByteArray;
use tokio::{sync::mpsc, task};

use crate::{
Expand Down Expand Up @@ -68,27 +68,9 @@ impl DhtRpcServiceImpl {
let iter = peers
.into_iter()
.filter_map(|peer| {
let mut peer_info =
let peer_info =
UnvalidatedPeerInfo::from_peer_limited_claims(peer, max_claims, max_addresses_per_claim);

// Filter out all identity claims with invalid signatures
let count = peer_info.claims.len();
let peer_public_key = peer_info.public_key.clone();
peer_info.claims.retain(|claim| {
claim
.signature
.is_valid(&peer_public_key, claim.features, claim.addresses.as_slice())
});
if count != peer_info.claims.len() {
warn!(
target: LOG_TARGET,
"Peer `{}` provided {} claims but only {} were valid",
peer_info.public_key.to_hex(),
count,
peer_info.claims.len()
);
}

if peer_info.claims.is_empty() {
None
} else {
Expand Down

0 comments on commit 2de8c9f

Please sign in to comment.