diff --git a/Cargo.toml b/Cargo.toml index be6fd2c..def63c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/accumulator", "crates/vlc", "crates/hlc", + "crates/hvlc", "crates/cops", "crates/vrf", "crates/crypto", @@ -19,7 +20,7 @@ members = [ "demos/coll_tx", "demos/vlc_dag", "demos/tee_vlc", - "demos/test_vlc_net", + "demos/test_vlc_net", ] [profile.dev] diff --git a/crates/README.md b/crates/README.md index 3877883..5d3bee9 100644 --- a/crates/README.md +++ b/crates/README.md @@ -15,6 +15,17 @@ The crates folder of Chronos includes core functional code crates and utility li - Each timestamp consists of a wall-clock time and a logical component, allowing for easy comparison and conflict resolution. - This crate is an implementation of the [Hybrid Logical Clock](http://www.cse.buffalo.edu/tech-reports/2014-04.pdf). +## [hvlc](./hvlc/) + +- This Hybrid Vector Logical Clock (HVLC) crate implements a hybrid vector clock structure that combines physical timestamps with vector clock properties. +- HVLC uses a BTreeMap to store logical clock values for multiple nodes while maintaining a physical timestamp, enabling efficient tracking of causality and concurrent events in distributed systems. +- Each clock instance contains: + - A mapping table (inner) that records logical clock values for each node ID + - A physical timestamp used to provide total ordering when logical clock comparison is insufficient +- The implementation provides core functionalities like event ordering, clock merging, and base calculation, suitable for scenarios requiring distributed causality tracking. +- Compared to regular vector clocks, HVLC offers better total ordering support through physical timestamps while maintaining the causal consistency properties of vector clocks. +- It can be used to as the [CRDTs](https://crdt.tech/)(Conflict-free Replicated Data Type) algorithm in distributed scenarios for providing total ordering. + ## [accumulator](./accumulator/) - A simple accumulator application. diff --git a/crates/hlc/src/lib.rs b/crates/hlc/src/lib.rs index ade6f49..a3087fc 100644 --- a/crates/hlc/src/lib.rs +++ b/crates/hlc/src/lib.rs @@ -84,6 +84,26 @@ pub struct State { now: F, } +impl SystemTime> PartialEq for State { + fn eq(&self, other: &Self) -> bool { + self.s == other.s + } +} + +impl SystemTime> Eq for State {} + +impl SystemTime> PartialOrd for State { + fn partial_cmp(&self, other: &Self) -> Option { + self.s.partial_cmp(&other.s) + } +} + +impl SystemTime> Ord for State { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.s.cmp(&other.s) + } +} + impl State<()> { // Creates a standard hybrid logical clock, using `std::time::SystemTime` as // supplier of the physical clock's wall time. @@ -164,6 +184,15 @@ mod tests { HLTimespec::new(s, ns, l) } + #[test] + fn hlts_comparing() { + let mut hlc = State::new(); + let hlc_1 = hlc.get_time(); + let hlc_2 = hlc.get_time(); + println!("hlc1 {:?}, \nhlc2 {:?}", hlc_1, hlc_2); + assert!(hlc_1 < hlc_2); + } + #[test] fn it_works() { // Start with a reference time for tests diff --git a/crates/hvlc/Cargo.toml b/crates/hvlc/Cargo.toml new file mode 100644 index 0000000..545a2fb --- /dev/null +++ b/crates/hvlc/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "hvlc" +version = "0.1.0" +edition = "2021" + +[dependencies] +sha2 = "0.10.8" +sha3 = "0.10.1" +rand = "0.8.5" +rand_distr = "0.4.3" +bincode = "1.3.3" +hex = "0.4.3" +tracing = "0.1.40" +futures = "0.3.30" +num_cpus = "1.13.1" +derive_more = "0.99.17" +derive-where = "1.2.7" +serde = { version = "1", features = ["derive"] } +anyhow = { version = "1.0.79", features = ["backtrace"] } +tracing-subscriber = "0.3.18" +secp256k1 = { version = "0.29.0", features = ["rand-std", "serde", "recovery"] } +tokio = { version = "1.35.1", features = [ + "net", + "time", + "sync", + "rt", + "signal", + "macros", + "rt-multi-thread", + "fs", + "process", + "io-util", +] } +tokio-util = "0.7.10" +crypto = { path = "../crypto", version = "0.1.0" } diff --git a/crates/hvlc/src/lib.rs b/crates/hvlc/src/lib.rs new file mode 100644 index 0000000..4c97d21 --- /dev/null +++ b/crates/hvlc/src/lib.rs @@ -0,0 +1,330 @@ +use bincode::Options; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::{cmp::Ordering, collections::BTreeMap}; +use tracing::error; + +pub trait Clock: PartialOrd + Clone + Send + Sync + 'static { + fn reduce(&self) -> LamportClock; +} + +/// A Lamport clock is a simple logical clock that counts events. +/// It is represented as an unsigned 64-bit integer. +pub type LamportClock = u64; + +impl Clock for LamportClock { + fn reduce(&self) -> LamportClock { + *self + } +} + +/// clock key_id +pub type KeyId = u64; + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Default, derive_more::Deref, Serialize, Deserialize, +)] +pub struct HVLCClock { + #[deref] + pub inner: BTreeMap, + pub timestamp: u128, +} + +impl AsRef for HVLCClock { + fn as_ref(&self) -> &HVLCClock { + self + } +} + +impl HVLCClock { + /// Creates a new HVLCClock instance with an empty BTreeMap and current timestamp + /// Returns a new HVLCClock with: + /// - Empty inner BTreeMap for storing key-value pairs + /// - Current system timestamp in nanoseconds + pub fn new() -> Self { + Self { + inner: BTreeMap::new(), + timestamp: match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(duration) => duration.as_nanos(), + Err(e) => { + error!("New HVLCClock SystemTime error: {:?}", e); + 0 + } + }, + } + } + + /// Checks if this clock represents a genesis state + /// Returns true if all values in the inner map are 0, false otherwise + pub fn is_genesis(&self) -> bool { + self.inner.values().all(|n| *n == 0) + } + + /// Merges this clock with another clock by taking the maximum values + /// Returns a new HVLCClock containing: + /// - For each key, the maximum value between both clocks + /// - The maximum timestamp between both clocks + fn merge(&self, other: &Self) -> Self { + let merged = self + .inner + .keys() + .chain(other.inner.keys()) + .map(|id| { + let clock = match (self.inner.get(id), other.inner.get(id)) { + (Some(n), Some(other_n)) => (*n).max(*other_n), + (Some(n), None) => *n, + (None, Some(other_n)) => *other_n, + (None, None) => unreachable!("The key {} does not exist in either clock", id), + }; + (*id, clock) + }) + .collect(); + Self { + inner: merged, + timestamp: std::cmp::max(self.timestamp, other.timestamp), + } + } + + /// Updates this clock by merging with other clocks and incrementing a specific ID + /// Parameters: + /// - others: Iterator of other clocks to merge with + /// - id: The key ID to increment after merging + /// Returns a new HVLCClock with merged values and incremented ID + pub fn update<'a>(&'a self, others: impl Iterator, id: u64) -> Self { + let mut updated = others.fold(self.clone(), |version, dep| version.merge(dep)); + // If no other clocks were merged, update timestamp to current time + if updated.timestamp == self.timestamp { + updated.timestamp = + match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(duration) => duration.as_nanos(), + Err(e) => { + error!("Update HVLCClock systemTime error: {:?}", e); + updated.timestamp + } + }; + } + *updated.inner.entry(id).or_default() += 1; + updated + } + + /// Creates a base clock from multiple clocks by taking minimum values + /// Parameters: + /// - others: Iterator of clocks to combine + /// Returns a new HVLCClock containing: + /// - For each key, the minimum value across all clocks + /// - The minimum timestamp across all clocks + pub fn base<'a>(others: impl Iterator) -> Self { + let mut combined = BTreeMap::new(); + let mut timestamp = u128::MAX; + for clock in others { + if clock.timestamp < timestamp { + timestamp = clock.timestamp; + } + for (&key, &value) in &clock.inner { + combined + .entry(key) + .and_modify(|e: &mut u64| *e = (*e).min(value)) + .or_insert(value); + } + } + + Self { + inner: combined, + timestamp, + } + } + + /// Calculates SHA256 hash of the clock + /// Returns a 32-byte array containing the SHA256 hash + pub fn calculate_sha256(&self) -> [u8; 32] { + let mut hasher = Sha256::new(); + let data = bincode::options() + .serialize(&self) + .expect("Failed to serialize data"); + // Update the hasher with the JSON string + hasher.update(data); + + // Calculate the hash & return bytes + hasher.finalize().into() + } +} + +impl PartialOrd for HVLCClock { + /// Compares two HVLCClock instances to determine their ordering. + /// + /// # Arguments + /// + /// * `other` - Another HVLCClock instance to compare against. + /// + /// # Returns + /// + /// * `Option` - Returns `Some(Ordering::Greater)` if `self` is greater, + /// `Some(Ordering::Less)` if `self` is less, or `Some(Ordering::Equal)` if they are equal. + /// Returns `None` if the comparison is indeterminate. + fn partial_cmp(&self, other: &Self) -> Option { + /// Helper function to determine if one clock is greater than or equal to another. + /// + /// # Arguments + /// + /// * `clock` - The HVLCClock instance to check. + /// * `other_clock` - The HVLCClock instance to compare against. + /// + /// # Returns + /// + /// * `bool` - Returns `true` if `clock` is greater than or equal to `other_clock`. + fn ge(clock: &HVLCClock, other_clock: &HVLCClock) -> bool { + for (other_id, other_n) in &other_clock.inner { + if *other_n == 0 { + continue; + } + let Some(n) = clock.inner.get(other_id) else { + return false; + }; + if n < other_n { + return false; + } + } + true + } + + match (ge(self, other), ge(other, self)) { + (true, true) => Some(self.timestamp.cmp(&other.timestamp)), + (true, false) => Some(Ordering::Greater), + (false, true) => Some(Ordering::Less), + (false, false) => Some(self.timestamp.cmp(&other.timestamp)), + } + } +} + +impl HVLCClock { + pub fn dep_cmp(&self, other: &Self, id: KeyId) -> Ordering { + match (self.inner.get(&id), other.inner.get(&id)) { + // disabling this check after the definition of genesis clock has been extended + // haven't revealed any bug with this assertion before, hopefully disabling it will not + // hide any bug in the future as well + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + // this can happen on the startup insertion + (None, None) => Ordering::Equal, + (Some(n), Some(m)) => n.cmp(m), + } + } +} + +impl Clock for HVLCClock { + fn reduce(&self) -> LamportClock { + self.inner + .values() + .fold(0u128, |acc, &value| acc + value as u128) as u64 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let clock = HVLCClock::new(); + assert!(clock.inner.is_empty()); + assert!(clock.timestamp > 0); + } + + #[test] + fn test_is_genesis() { + let mut clock = HVLCClock::new(); + assert!(clock.is_genesis()); + + clock.inner.insert(1, 1); + assert!(!clock.is_genesis()); + } + + #[test] + fn test_merge() { + let mut clock1 = HVLCClock::new(); + clock1.inner.insert(1, 1); + clock1.inner.insert(2, 2); + clock1.timestamp = 100; + + let mut clock2 = HVLCClock::new(); + clock2.inner.insert(2, 3); + clock2.inner.insert(3, 1); + clock2.timestamp = 200; + + let merged = clock1.merge(&clock2); + assert_eq!(*merged.inner.get(&1).unwrap(), 1); + assert_eq!(*merged.inner.get(&2).unwrap(), 3); + assert_eq!(*merged.inner.get(&3).unwrap(), 1); + assert_eq!(merged.timestamp, 200); + } + + #[test] + fn test_update() { + let mut clock1 = HVLCClock::new(); + clock1.inner.insert(1, 1); + + let mut clock2 = HVLCClock::new(); + clock2.inner.insert(1, 2); + + let updated = clock1.update(vec![&clock2].into_iter(), 1); + assert_eq!(*updated.inner.get(&1).unwrap(), 3); + } + + #[test] + fn test_base() { + let mut clock1 = HVLCClock::new(); + clock1.inner.insert(1, 3); + clock1.inner.insert(2, 2); + + let mut clock2 = HVLCClock::new(); + clock2.inner.insert(1, 2); + clock2.inner.insert(2, 4); + + let base = HVLCClock::base(vec![&clock1, &clock2].into_iter()); + assert_eq!(*base.inner.get(&1).unwrap(), 2); + assert_eq!(*base.inner.get(&2).unwrap(), 2); + } + + #[test] + fn test_partial_ord() { + let mut clock1 = HVLCClock::new(); + clock1.inner.insert(1, 2); + clock1.timestamp = 100; + + let mut clock2 = HVLCClock::new(); + clock2.inner.insert(1, 1); + clock2.timestamp = 200; + + assert_eq!(clock1.partial_cmp(&clock2), Some(Ordering::Greater)); + } + + #[test] + fn test_dep_cmp() { + let mut clock1 = HVLCClock::new(); + clock1.inner.insert(1, 2); + + let mut clock2 = HVLCClock::new(); + clock2.inner.insert(1, 1); + + assert_eq!(clock1.dep_cmp(&clock2, 1), Ordering::Greater); + assert_eq!(clock1.dep_cmp(&clock2, 2), Ordering::Equal); + } + + #[test] + fn test_reduce() { + let mut clock = HVLCClock::new(); + clock.inner.insert(1, 2); + clock.inner.insert(2, 3); + + assert_eq!(clock.reduce(), 5); + } + + #[test] + fn test_calculate_sha256() { + let mut clock = HVLCClock::new(); + clock.inner.insert(1, 2); + clock.timestamp = 100; + assert_eq!(clock.calculate_sha256(), clock.calculate_sha256()); + println!("0x{}", hex::encode(clock.calculate_sha256())); + } +} diff --git a/demos/test_vlc_net/src/server.rs b/demos/test_vlc_net/src/server.rs index bc33cd1..8f1fce0 100644 --- a/demos/test_vlc_net/src/server.rs +++ b/demos/test_vlc_net/src/server.rs @@ -131,6 +131,11 @@ impl Server { system_infos.refresh_memory(); let total_memory = system_infos.total_memory() as f64; + if total_memory == 0.0 { + error!("Failed to get total system memory"); + return 100.0; // Assume worst case + } + let used_memory = system_infos.used_memory() as f64; (used_memory / total_memory) * 100.0 }