From 076430b40991c6da53651828691454fd953fb391 Mon Sep 17 00:00:00 2001 From: ai-chen2050 <1033467071@qq.com> Date: Thu, 19 Sep 2024 16:45:44 +0800 Subject: [PATCH] feat: add the common tee enclave utilities, and vlc in tee And tee_vlc module verifiable logic clock is an implementation of Chronos's TEE backend. --- Cargo.toml | 7 +- crates/README.md | 5 + crates/enclaves/Cargo.toml | 17 + crates/enclaves/src/lib.rs | 1 + crates/enclaves/src/nitro_secure.rs | 155 +++++++ crates/types/Cargo.toml | 10 + crates/types/src/configuration.rs | 9 + crates/types/src/lib.rs | 2 + crates/types/src/raw_wrapper.rs | 32 ++ crates/vlc/Cargo.toml | 16 + crates/vlc/src/lib.rs | 5 +- crates/vlc/src/ordinary_clock.rs | 388 ++++++++++++++++++ demos/README.md | 10 + demos/{coll-tx => coll_tx}/Cargo.toml | 2 +- demos/{coll-tx => coll_tx}/src/lib.rs | 0 demos/{coll-tx => coll_tx}/src/simple_utxo.rs | 0 demos/tee_vlc/Cargo.toml | 35 ++ demos/tee_vlc/README.md | 46 +++ demos/tee_vlc/image/Dockerfile | 5 + demos/tee_vlc/image/run.sh | 7 + demos/tee_vlc/src/bin/call_vlc_client.rs | 194 +++++++++ demos/tee_vlc/src/bin/run-solo-vlc-enclave.rs | 71 ++++ demos/tee_vlc/src/lib.rs | 13 + demos/tee_vlc/src/main.rs | 7 + demos/tee_vlc/src/nitro_clock.rs | 292 +++++++++++++ demos/{vlc-dag => vlc_dag}/Cargo.toml | 2 +- .../src/db_client/lldb_client.rs | 0 .../src/db_client/lldb_test.rs | 0 .../{vlc-dag => vlc_dag}/src/db_client/mod.rs | 0 demos/{vlc-dag => vlc_dag}/src/lib.rs | 0 30 files changed, 1325 insertions(+), 6 deletions(-) create mode 100644 crates/enclaves/Cargo.toml create mode 100644 crates/enclaves/src/lib.rs create mode 100644 crates/enclaves/src/nitro_secure.rs create mode 100644 crates/types/Cargo.toml create mode 100644 crates/types/src/configuration.rs create mode 100644 crates/types/src/lib.rs create mode 100644 crates/types/src/raw_wrapper.rs create mode 100644 crates/vlc/src/ordinary_clock.rs rename demos/{coll-tx => coll_tx}/Cargo.toml (93%) rename demos/{coll-tx => coll_tx}/src/lib.rs (100%) rename demos/{coll-tx => coll_tx}/src/simple_utxo.rs (100%) create mode 100644 demos/tee_vlc/Cargo.toml create mode 100644 demos/tee_vlc/README.md create mode 100644 demos/tee_vlc/image/Dockerfile create mode 100644 demos/tee_vlc/image/run.sh create mode 100644 demos/tee_vlc/src/bin/call_vlc_client.rs create mode 100644 demos/tee_vlc/src/bin/run-solo-vlc-enclave.rs create mode 100644 demos/tee_vlc/src/lib.rs create mode 100644 demos/tee_vlc/src/main.rs create mode 100644 demos/tee_vlc/src/nitro_clock.rs rename demos/{vlc-dag => vlc_dag}/Cargo.toml (96%) rename demos/{vlc-dag => vlc_dag}/src/db_client/lldb_client.rs (100%) rename demos/{vlc-dag => vlc_dag}/src/db_client/lldb_test.rs (100%) rename demos/{vlc-dag => vlc_dag}/src/db_client/mod.rs (100%) rename demos/{vlc-dag => vlc_dag}/src/lib.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index f50f377..02bfa30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,12 @@ members = [ "crates/cops", "crates/vrf", "crates/crypto", + "crates/enclaves", + "crates/types", "demos/test_conflict", - "demos/coll-tx", - "demos/vlc-dag", + "demos/coll_tx", + "demos/vlc_dag", + "demos/tee_vlc", ] [profile.release] diff --git a/crates/README.md b/crates/README.md index 1fcac34..e9584bf 100644 --- a/crates/README.md +++ b/crates/README.md @@ -20,6 +20,11 @@ The crates folder of Chronos includes core functional code crates and utility li - The data store maintains a set of key-value pairs. - It provides causal consistency to clients. +## [enclaves](./enclaves/) + +- This module provides some common utilities of TEE (Trusted Execution Environment) Enclaves. +- For examples: AWS nitro enclave, Mircosoft Azure, Intel SGX, etc. + ## [crypto](./crypto/) - Some common crypto utilities, signatures, verify, and hash functions for elliptic curve. diff --git a/crates/enclaves/Cargo.toml b/crates/enclaves/Cargo.toml new file mode 100644 index 0000000..5ff57f9 --- /dev/null +++ b/crates/enclaves/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "enclaves" +version = "0.1.0" +edition = "2021" + +[features] +nitro-enclaves = ["aws-nitro-enclaves-nsm-api", "aws-nitro-enclaves-attestation"] + +[dependencies] +bincode = "1.3.3" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +anyhow = { version = "1.0.79", features = ["backtrace"] } +nix = { version = "0.28.0", features = ["socket", "sched", "resource"] } +tokio = { version = "1.35.1", features = ["net", "time", "sync", "rt", "signal", "macros", "rt-multi-thread", "fs", "process", "io-util"] } +aws-nitro-enclaves-nsm-api = { version = "0.4.0", optional = true } +aws-nitro-enclaves-attestation = { git = "https://github.com/neatsys/aws-nitro-enclaves-attestation", version = "0.1.0", optional = true } \ No newline at end of file diff --git a/crates/enclaves/src/lib.rs b/crates/enclaves/src/lib.rs new file mode 100644 index 0000000..f4dc047 --- /dev/null +++ b/crates/enclaves/src/lib.rs @@ -0,0 +1 @@ +pub mod nitro_secure; diff --git a/crates/enclaves/src/nitro_secure.rs b/crates/enclaves/src/nitro_secure.rs new file mode 100644 index 0000000..6c30a2a --- /dev/null +++ b/crates/enclaves/src/nitro_secure.rs @@ -0,0 +1,155 @@ +use std::sync::Arc; +use std::{future::Future, pin::Pin}; +use tokio::sync::mpsc::UnboundedSender; +use tracing::warn; + +/// HandleCallbackFn is running handler behind in vsock. +/// params: input_buf, nsm (nitro secure module), pcrs, write_sender(reply sender) +pub type HandleFn = Arc< + dyn Fn( + Vec, + Arc, + [Vec; 3], + UnboundedSender>, + ) -> Pin> + Send>> + + Send + + Sync, +>; + +#[derive(Debug)] +pub struct NitroSecureModule(pub i32); + +#[cfg(feature = "nitro-enclaves")] +impl NitroSecureModule { + fn new() -> anyhow::Result { + let fd = aws_nitro_enclaves_nsm_api::driver::nsm_init(); + anyhow::ensure!(fd >= 0); + Ok(Self(fd)) + } + + pub fn process_attestation(&self, user_data: Vec) -> anyhow::Result> { + use aws_nitro_enclaves_nsm_api::api::Request::Attestation; + // some silly code to avoid explicitly mention `serde_bytes::ByteBuf` + let mut request = Attestation { + user_data: Some(Default::default()), + nonce: None, + public_key: None, + }; + let Attestation { + user_data: Some(buf), + .. + } = &mut request + else { + unreachable!() + }; + buf.extend(user_data); + match aws_nitro_enclaves_nsm_api::driver::nsm_process_request(self.0, request) { + aws_nitro_enclaves_nsm_api::api::Response::Attestation { document } => Ok(document), + aws_nitro_enclaves_nsm_api::api::Response::Error(err) => anyhow::bail!("{err:?}"), + _ => anyhow::bail!("unimplemented"), + } + } + + fn describe_pcr(&self, index: u16) -> anyhow::Result> { + use aws_nitro_enclaves_nsm_api::api::Request::DescribePCR; + match aws_nitro_enclaves_nsm_api::driver::nsm_process_request(self.0, DescribePCR { index }) + { + aws_nitro_enclaves_nsm_api::api::Response::DescribePCR { lock: _, data } => Ok(data), + aws_nitro_enclaves_nsm_api::api::Response::Error(err) => anyhow::bail!("{err:?}"), + _ => anyhow::bail!("unimplemented"), + } + } + + pub async fn run(port: u32, handler: HandleFn) -> anyhow::Result<()> { + use std::os::fd::AsRawFd; + + use nix::sys::socket::{ + bind, listen, socket, AddressFamily, Backlog, SockFlag, SockType, VsockAddr, + }; + use tokio::{ + io::{AsyncReadExt as _, AsyncWriteExt as _}, + sync::mpsc::unbounded_channel, + }; + + let nsm = std::sync::Arc::new(Self::new()?); + let pcrs = [ + nsm.describe_pcr(0)?, + nsm.describe_pcr(1)?, + nsm.describe_pcr(2)?, + ]; + + let socket_fd = socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::empty(), + None, + )?; + bind(socket_fd.as_raw_fd(), &VsockAddr::new(0xFFFFFFFF, port))?; + // theoretically this is the earliest point to entering Tokio world, but i don't want to go + // unsafe with `FromRawFd`, and Tokio don't have a `From` yet + listen(&socket_fd, Backlog::new(64)?)?; + let socket = std::os::unix::net::UnixListener::from(socket_fd); + socket.set_nonblocking(true)?; + let socket = tokio::net::UnixListener::from_std(socket)?; + + loop { + let (stream, _) = socket.accept().await?; + let (mut read_half, mut write_half) = stream.into_split(); + let (write_sender, mut write_receiver) = unbounded_channel::>(); + + let mut write_session = tokio::spawn(async move { + while let Some(buf) = write_receiver.recv().await { + write_half.write_u64_le(buf.len() as _).await?; + write_half.write_all(&buf).await?; + } + anyhow::Ok(()) + }); + let nsm = nsm.clone(); + let pcrs = pcrs.clone(); + let handler = handler.clone(); + let mut read_session = tokio::spawn(async move { + loop { + let task = async { + let len = read_half.read_u64_le().await?; + let mut buf = vec![0; len as _]; + read_half.read_exact(&mut buf).await?; + anyhow::Ok(buf) + }; + let buf = match task.await { + Ok(buf) => buf, + Err(err) => { + warn!("{err}"); + return anyhow::Ok(()); + } + }; + let nsm_clone = nsm.clone(); + let pcrs_clone = pcrs.clone(); + let write_sender = write_sender.clone(); + let handler = handler.clone(); + tokio::spawn(async move { + if let Err(err) = handler(buf, nsm_clone, pcrs_clone, write_sender).await { + eprintln!("Error: {:?}", err); + } + }); + } + }); + loop { + let result = tokio::select! { + result = &mut read_session, if !read_session.is_finished() => result, + result = &mut write_session, if !write_session.is_finished() => result, + else => break, + }; + if let Err(err) = result.map_err(Into::into).and_then(std::convert::identity) { + warn!("{err}") + } + } + } + } +} + +#[cfg(feature = "nitro-enclaves")] +impl Drop for NitroSecureModule { + fn drop(&mut self) { + aws_nitro_enclaves_nsm_api::driver::nsm_exit(self.0) + } +} diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml new file mode 100644 index 0000000..33853e4 --- /dev/null +++ b/crates/types/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "types" +version = "0.1.0" +edition = "2021" + +[dependencies] +derive_more = "0.99.17" +derive-where = "1.2.7" +serde_json = "1.0.114" +serde = { version = "1.0.195", features = ["derive"] } diff --git a/crates/types/src/configuration.rs b/crates/types/src/configuration.rs new file mode 100644 index 0000000..862aa0e --- /dev/null +++ b/crates/types/src/configuration.rs @@ -0,0 +1,9 @@ + +/// Networks consts +/// Suggested buffer size +pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 65507; + +/// Round number of a block. +pub type Round = u64; + +pub const GENESIS_ROUND: Round = 0; \ No newline at end of file diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs new file mode 100644 index 0000000..9277d13 --- /dev/null +++ b/crates/types/src/lib.rs @@ -0,0 +1,2 @@ +pub mod raw_wrapper; +pub mod configuration; \ No newline at end of file diff --git a/crates/types/src/raw_wrapper.rs b/crates/types/src/raw_wrapper.rs new file mode 100644 index 0000000..b2bf817 --- /dev/null +++ b/crates/types/src/raw_wrapper.rs @@ -0,0 +1,32 @@ +use std::{fmt::Debug, hash::Hash}; +use serde::{Deserialize, Serialize}; + +/// Payload is the vec but derive many traits, like hash, debug, clone, etc. +#[derive( + Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default, derive_more::Deref, Serialize, Deserialize, +)] +pub struct Payload(pub Vec); + +impl Debug for Payload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(s) = std::str::from_utf8(&self.0) { + write!(f, "Payload(\"{s}\")") + } else { + write!( + f, + "Payload({}{})", + self.0 + .iter() + .map(|b| format!("{b:02x}")) + .take(32) + .collect::>() + .concat(), + if self.0.len() > 32 { + format!(".. ", self.0.len()) + } else { + String::new() + } + ) + } + } +} \ No newline at end of file diff --git a/crates/vlc/Cargo.toml b/crates/vlc/Cargo.toml index 7869f75..e699240 100644 --- a/crates/vlc/Cargo.toml +++ b/crates/vlc/Cargo.toml @@ -6,4 +6,20 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +sha2 = "0.10.8" +sha3 = "0.10.1" +rand = "0.8.5" +rand_distr = "0.4.3" +bincode = "1.3.3" +tracing = "0.1.40" +futures = "0.3.30" +num_cpus = "1.13.1" +derive_more = "0.99.17" +derive-where = "1.2.7" serde = { version = "1", features = ["derive"] } +anyhow = { version = "1.0.79", features = ["backtrace"] } +tracing-subscriber = "0.3.18" +secp256k1 = { version = "0.29.0", features = ["rand-std", "serde", "recovery"] } +tokio = { version = "1.35.1", features = ["net", "time", "sync", "rt", "signal", "macros", "rt-multi-thread", "fs", "process", "io-util"] } +tokio-util = "0.7.10" +crypto ={ path = "../crypto", version = "0.1.0"} diff --git a/crates/vlc/src/lib.rs b/crates/vlc/src/lib.rs index aaa6e9c..a5f1827 100644 --- a/crates/vlc/src/lib.rs +++ b/crates/vlc/src/lib.rs @@ -2,8 +2,9 @@ //! //! This crate implements a verifiable logical clock construct. The clock //! can be used in a peer-to-peer network to order events. Any node in the -//! network can verify the correctness of the clock. - +//! network can verify the correctness of the clock. And HashMap as its core +//! data structure. +pub mod ordinary_clock; use serde::{Deserialize, Serialize}; use std::cmp; use std::collections::HashMap; diff --git a/crates/vlc/src/ordinary_clock.rs b/crates/vlc/src/ordinary_clock.rs new file mode 100644 index 0000000..e8c8f64 --- /dev/null +++ b/crates/vlc/src/ordinary_clock.rs @@ -0,0 +1,388 @@ +//! This clock use the BTreeMap as its core data structure. + +use std::{cmp::Ordering, collections::BTreeMap}; +use serde::{Deserialize, Serialize}; +use sha2::{Sha256, Digest}; +use bincode::Options; + +pub trait Clock: PartialOrd + Clone + Send + Sync + 'static { + fn reduce(&self) -> LamportClock; +} + +pub type LamportClock = u64; + +impl Clock for LamportClock { + fn reduce(&self) -> LamportClock { + *self + } +} + +/// clock key_id +pub type KeyId = u64; + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Default, derive_more::Deref, Serialize, Deserialize, +)] +pub struct OrdinaryClock(pub BTreeMap); + +impl AsRef for OrdinaryClock { + fn as_ref(&self) -> &OrdinaryClock { + self + } +} + +impl OrdinaryClock { + pub fn new() -> Self { + Self::default() + } + + pub fn is_genesis(&self) -> bool { + self.0.values().all(|n| *n == 0) + } + + fn merge(&self, other: &Self) -> Self { + let merged = self + .0 + .keys() + .chain(other.0.keys()) + .map(|id| { + let n = match (self.0.get(id), other.0.get(id)) { + (Some(n), Some(other_n)) => (*n).max(*other_n), + (Some(n), None) | (None, Some(n)) => *n, + (None, None) => unreachable!(), + }; + (*id, n) + }) + .collect(); + Self(merged) + } + + pub fn update<'a>(&'a self, others: impl Iterator, id: u64) -> Self { + let mut updated = others.fold(self.clone(), |version, dep| version.merge(dep)); + *updated.0.entry(id).or_default() += 1; + updated + } + + pub fn base<'a>(others: impl Iterator) -> Self { + let mut combined = BTreeMap::new(); + + for clock in others { + for (&key, &value) in &clock.0 { + combined + .entry(key) + .and_modify(|e: &mut u64| *e = (*e).min(value)) + .or_insert(value); + } + } + + OrdinaryClock(combined) + } + + pub fn calculate_sha256(&self) -> [u8; 32] { + let mut hasher = Sha256::new(); + let data = bincode::options().serialize(&self.0).expect("Failed to serialize data"); + // Update the hasher with the JSON string + hasher.update(data); + + // Calculate the hash & return bytes + hasher.finalize().into() + } +} + +impl PartialOrd for OrdinaryClock { + fn partial_cmp(&self, other: &Self) -> Option { + fn ge(clock: &OrdinaryClock, other_clock: &OrdinaryClock) -> bool { + for (other_id, other_n) in &other_clock.0 { + if *other_n == 0 { + continue; + } + let Some(n) = clock.0.get(other_id) else { + return false; + }; + if n < other_n { + return false; + } + } + true + } + match (ge(self, other), ge(other, self)) { + (true, true) => Some(Ordering::Equal), + (true, false) => Some(Ordering::Greater), + (false, true) => Some(Ordering::Less), + (false, false) => None, + } + } +} + +impl OrdinaryClock { + pub fn dep_cmp(&self, other: &Self, id: KeyId) -> Ordering { + match (self.0.get(&id), other.0.get(&id)) { + // disabling this check after the definition of genesis clock has been extended + // haven't revealed any bug with this assertion before, hopefully disabling it will not + // hide any bug in the future as well + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + // this can happen on the startup insertion + (None, None) => Ordering::Equal, + (Some(n), Some(m)) => n.cmp(m), + } + } +} + +impl Clock for OrdinaryClock { + fn reduce(&self) -> LamportClock { + self.0.values().copied().sum() + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use std::{sync::{atomic::{AtomicUsize, Ordering}, Arc}, time::{Duration, Instant}}; + use rand::rngs::OsRng; + use futures::future::join_all; + use tokio::runtime::Builder; + use crypto::{core::DigestHash, recovery::{recover_public_key, sign_message_recover_pk}}; + + + #[test] + fn default_is_genesis() -> anyhow::Result<()> { + anyhow::ensure!(OrdinaryClock::default().is_genesis()); + Ok(()) + } + + #[test] + fn test_clock_base_func() -> anyhow::Result<()> { + let mut clock1 = BTreeMap::new(); + clock1.insert(1, 10); + clock1.insert(2, 0); + clock1.insert(3, 5); + + let mut clock2 = BTreeMap::new(); + clock2.insert(1, 0); + clock2.insert(2, 20); + clock2.insert(3, 2); + + let mut clock3 = BTreeMap::new(); + clock3.insert(1, 7); + clock3.insert(2, 15); + clock3.insert(4, 8); + + let oc1 = OrdinaryClock(clock1); + let oc2 = OrdinaryClock(clock2); + let oc3 = OrdinaryClock(clock3); + + let clocks = vec![&oc1, &oc2, &oc3]; + let base_clock = OrdinaryClock::base(clocks.into_iter()); + println!("{:?}", base_clock); // Should print: OrdinaryClock({1: 0, 2: 0, 3: 2, 4: 8}) + assert_eq!(base_clock, OrdinaryClock(BTreeMap::from([(1, 0), (2, 0), (3, 2), (4, 8)]))); + Ok(()) + } + + #[test] + fn clock_sha256() -> anyhow::Result<()> { + let mut clock = OrdinaryClock((0..4).map(|i| (i as _, 0)).collect()); + clock = clock.update(vec![OrdinaryClock::default()].iter(), 0); + println!("{:?}, {:?}", clock, clock.calculate_sha256()); + + // Tips: when clock is hashmap, this serialize and sha256 can't reproduce, every time is different. + Ok(()) + } + + #[test] + #[ignore] + fn hash_big_clock_sha256() -> anyhow::Result<()> { + let clock = OrdinaryClock((0..1<<27).map(|i| (i as _, 0)).collect()); + let start_time = Instant::now(); + let clock_hash = clock.sha256().to_fixed_bytes(); + println!("{:?}, {:?}", clock_hash, start_time.elapsed()); + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn stress_raw_update() -> anyhow::Result<()> { + for size in (0..=12).step_by(2).map(|n| 1 << n) { + let num_merged = 0; + let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); + + let mut count = 0; + let start_time = Instant::now(); + let close_loops_session = async { + let mut current_clock = clock.clone(); + loop { + if start_time.elapsed() >= Duration::from_secs(10) { + break; + } + + let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + count += 1; + current_clock = updated_clock; + } + anyhow::Ok(()) + }; + + close_loops_session.await?; + println!( + "key {size},merged {num_merged}, tps {}", + count as f32 / 10. + ); + } + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn stress_raw_update_concurrency() -> anyhow::Result<()> { + let core = num_cpus::get(); + let rt = Arc::new(Builder::new_multi_thread() + .worker_threads(core) + .build() + .unwrap()); + + for size in (0..=12).step_by(2).map(|n| 1 << n) { + let count = Arc::new(AtomicUsize::new(0)); + let mut tasks = Vec::new(); + let mut shifts: Vec = Vec::with_capacity(core); + for _ in 0..core { + shifts.push(size); + } + for size in shifts { + let num_merged = 0; + let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); + + let count_clone = Arc::clone(&count); + let start_time = Instant::now(); + let close_loops_session = async move { + // different clocks in different threads + let mut current_clock = clock.clone(); + loop { + if start_time.elapsed() >= Duration::from_secs(10) { + break; + } + + let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + count_clone.fetch_add(1, Ordering::Relaxed); + current_clock = updated_clock; + } + current_clock + }; + tasks.push(rt.spawn(close_loops_session)); + } + let results = join_all(tasks).await; + for result in results { + let clock = result?; + println!("key: {}, clock: {:?}", size, clock.0.get(&0)); + } + + println!( + "key {}, merged 0, tps {}", + size, + count.load(Ordering::Relaxed) as f32 / 10. + ); + } + + // Shutdown Runtime + Arc::try_unwrap(rt).unwrap().shutdown_background(); + + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn stress_verify_update() -> anyhow::Result<()> { + use DigestHash as _; + + let secp = secp256k1::Secp256k1::new(); + let (secret_key, public_key) = secp.generate_keypair(&mut OsRng); + + for size in (0..=12).step_by(2).map(|n| 1 << n) { + let num_merged = 0; + let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); + let clock_hash = clock.sha256().to_fixed_bytes(); + let mut count = 0; + + // sign once + let signature_recover = sign_message_recover_pk(&secp, &secret_key, &clock.sha256().to_fixed_bytes()); + + let start_time = Instant::now(); + let close_loops_session = async { + let mut current_clock = clock.clone(); + loop { + if start_time.elapsed() >= Duration::from_secs(10) { + break; + } + + // verify + let recover_pubkey = recover_public_key(&secp, &signature_recover, &clock_hash).unwrap(); + assert_eq!(recover_pubkey, public_key); + + // update + let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + count += 1; + current_clock = updated_clock; + } + anyhow::Ok(()) + }; + + close_loops_session.await?; + println!( + "key {size},merged {num_merged}, tps {}", + count as f32 / 10. + ); + } + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn stress_sig_verify_update() -> anyhow::Result<()> { + use DigestHash as _; + + let secp = secp256k1::Secp256k1::new(); + let (secret_key, public_key) = secp.generate_keypair(&mut OsRng); + + for size in (0..=12).step_by(2).map(|n| 1 << n) { + let num_merged = 0; + let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); + + let mut count = 0; + let mut signatures = None; + let start_time = Instant::now(); + let close_loops_session = async { + let mut current_clock = clock.clone(); + loop { + if start_time.elapsed() >= Duration::from_secs(10) { + break; + } + + // verify + if !signatures.is_none() { + let clock_hash = current_clock.sha256().to_fixed_bytes(); + let recover_pubkey = recover_public_key(&secp, &signatures.unwrap(), &clock_hash).unwrap(); + assert_eq!(recover_pubkey, public_key); + } + + // update + let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + count += 1; + current_clock = updated_clock; + + // sign + let signature_recover = sign_message_recover_pk(&secp, &secret_key, ¤t_clock.sha256().to_fixed_bytes()); + signatures = Some(signature_recover); + } + anyhow::Ok(()) + }; + + close_loops_session.await?; + println!( + "key {size},merged {num_merged}, tps {}", + count as f32 / 10. + ); + } + Ok(()) + } + +} \ No newline at end of file diff --git a/demos/README.md b/demos/README.md index 09f8ca6..410bc9d 100644 --- a/demos/README.md +++ b/demos/README.md @@ -25,6 +25,16 @@ Randomness serves a vital role in nearly every aspect of current society,the i #### VLC & VRF Proposal Randomness serves a vital role in nearly every aspect of current society,the idea is to intergrate the ablility of logical clocks into random generator. To generate verifiable, fair random numbers, the proposal integrates VRF. +## [tee_vlc](./tee_vlc/) + +This module verifiable logic clock is an implementation of Chronos's TEE backend. + +And some features as follow: + +* Use the aws nitro enclave as its trust execution environment. +* Support functions test and press test cases. + + ## [Test-Conflict](./test_conflict/) This use case domo is designed to detect software version conflict by applied vector clock. diff --git a/demos/coll-tx/Cargo.toml b/demos/coll_tx/Cargo.toml similarity index 93% rename from demos/coll-tx/Cargo.toml rename to demos/coll_tx/Cargo.toml index 9694360..7c4fb79 100644 --- a/demos/coll-tx/Cargo.toml +++ b/demos/coll_tx/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "coll-tx" +name = "coll_tx" version = "0.1.0" edition = "2021" diff --git a/demos/coll-tx/src/lib.rs b/demos/coll_tx/src/lib.rs similarity index 100% rename from demos/coll-tx/src/lib.rs rename to demos/coll_tx/src/lib.rs diff --git a/demos/coll-tx/src/simple_utxo.rs b/demos/coll_tx/src/simple_utxo.rs similarity index 100% rename from demos/coll-tx/src/simple_utxo.rs rename to demos/coll_tx/src/simple_utxo.rs diff --git a/demos/tee_vlc/Cargo.toml b/demos/tee_vlc/Cargo.toml new file mode 100644 index 0000000..44c62ac --- /dev/null +++ b/demos/tee_vlc/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "tee_vlc" +version = "0.1.0" +edition = "2021" + +[features] +ordinary = [ + "nitro-enclaves", + "reqwest", +] +nitro-enclaves = ["aws-nitro-enclaves-nsm-api", "aws-nitro-enclaves-attestation"] + + +[dependencies] +bincode = "1.3.3" +blake2 = "0.10.6" +bytes = "1.5.0" +derive_more = "0.99.17" +derive-where = "1.2.7" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +rand = "0.8.5" +vlc ={ path = "../../crates/vlc", version = "0.1.0"} +types ={ path = "../../crates/types", version = "0.1.0"} +crypto ={ path = "../../crates/crypto", version = "0.1.0"} +enclaves ={ path = "../../crates/enclaves", version = "0.1.0"} +serde = { version = "1.0.195", features = ["derive"] } +nix = { version = "0.28.0", features = ["socket", "sched", "resource"] } +tikv-jemallocator = { version = "0.5.4", optional = true } +tokio = { version = "1.35.1", features = ["net", "time", "sync", "rt", "signal", "macros", "rt-multi-thread", "fs", "process", "io-util"] } +tokio-util = "0.7.10" +anyhow = { version = "1.0.79", features = ["backtrace"] } +reqwest = { version = "0.12.4", features = ["json", "multipart"], optional = true } +aws-nitro-enclaves-nsm-api = { version = "0.4.0", optional = true } +aws-nitro-enclaves-attestation = { git = "https://github.com/neatsys/aws-nitro-enclaves-attestation", version = "0.1.0", optional = true } diff --git a/demos/tee_vlc/README.md b/demos/tee_vlc/README.md new file mode 100644 index 0000000..2cc9d97 --- /dev/null +++ b/demos/tee_vlc/README.md @@ -0,0 +1,46 @@ +# VLC In TEE + +This module verifiable logic clock is an implementation of Chronos's TEE backend. + +## Prepare environment + +Now, this repository use the aws nitro enclave as its trust execution environment. + +So, please create a cloud virtual instance and notice choose the `Amazon-2023 linux` as base image. +Because this base operator system is more friendly for using of the aws nitro enclave. + +### Prepare Env & Configuration + +1. Prepare Env & install dependency tools +```sh +sudo sudo dnf upgrade +sudo dnf install -y tmux htop openssl-devel perl docker-24.0.5-1.amzn2023.0.3 aws-nitro-enclaves-cli aws-nitro-enclaves-cli-devel +``` + +2. Configuration + +Please `cat /etc/nitro_enclaves/allocator.yaml` and set cpu_count & memory_mib. For tee_vlc: just `2 core + 1024 M` is enough, for tee_llm: `4 core + 16384 M` at least. Update the file and save it. + +3. run `init.sh` + +```sh +cd scripts +sudo chmod +x init_env.sh +./init_env.sh +``` +Remember please re-run the script when you update the `/etc/nitro_enclaves/allocator.yaml`. + + +## Run VLC TEE Images + +```bash +cd image +cargo run --bin run-solo-vlc-enclave -- . --features nitro-enclaves +``` + +## Testing + +```bash +cargo run --bin call_vlc_client --features nitro-enclaves +``` + diff --git a/demos/tee_vlc/image/Dockerfile b/demos/tee_vlc/image/Dockerfile new file mode 100644 index 0000000..611d75a --- /dev/null +++ b/demos/tee_vlc/image/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest + +COPY tee_vlc . + +CMD ./tee_vlc \ No newline at end of file diff --git a/demos/tee_vlc/image/run.sh b/demos/tee_vlc/image/run.sh new file mode 100644 index 0000000..ef5f530 --- /dev/null +++ b/demos/tee_vlc/image/run.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +if [ "$1" = "debug" ]; then + nitro-cli run-enclave --cpu-count 2 --memory 2048 --enclave-cid 16 --eif-path app.eif --attach-console +else + nitro-cli run-enclave --cpu-count 2 --memory 2048 --enclave-cid 16 --eif-path app.eif +fi \ No newline at end of file diff --git a/demos/tee_vlc/src/bin/call_vlc_client.rs b/demos/tee_vlc/src/bin/call_vlc_client.rs new file mode 100644 index 0000000..1f585c4 --- /dev/null +++ b/demos/tee_vlc/src/bin/call_vlc_client.rs @@ -0,0 +1,194 @@ +use std::{ + env, + fmt::Write, + future::pending, + time::Duration, +}; + +use vlc::ordinary_clock::OrdinaryClock; +use tee_vlc::nitro_clock::{nitro_enclaves_portal_session, NitroEnclavesClock, Update, UpdateOk}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + time::{sleep, timeout, Instant}, +}; + +// tee id +const CID: u32 = 16; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + let args: Vec = env::args().collect(); + + let num_concurrent = if args.len() > 1 { + args[1].parse::().ok() + } else { + None + }; + + let run_nitro_client = { + let (update_sender, update_receiver) = unbounded_channel(); + let (update_ok_sender, mut update_ok_receiver) = unbounded_channel::>(); + tokio::spawn({ + let update_sender = update_sender.clone(); + async move { + pending::<()>().await; + drop(update_sender) + } + }); + ( + tokio::spawn(nitro_enclaves_portal_session( + CID, + 5006, + update_receiver, + update_ok_sender, + )), + tokio::spawn(async move { + let verify = |clock: NitroEnclavesClock| { + let document = clock.verify()?; + anyhow::ensure!(document.is_some()); + Ok(()) + }; + let mut lines = String::new(); + if let Some(num_concurrent) = num_concurrent { + for size in (0..=12).step_by(2).map(|n| 1 << n) { + stress_bench_session( + size, + 0, + num_concurrent, + &update_sender, + &mut update_ok_receiver, + &mut lines, + ) + .await?; + } + // println!("{lines}") + } else { + println!("key, num_merged, deserialize_tee, verify_proof_tee, update_clock_tee, gen_clock_proof_tee, total_in_tee, net_round"); + for size in (0..=16).step_by(2).map(|n| 1 << n) { + bench_session( + size, + 0, + &update_sender, + &mut update_ok_receiver, + verify, + &mut lines, + ) + .await? + } + for num_merged in 0..=15 { + bench_session( + 1 << 10, + num_merged, + &update_sender, + &mut update_ok_receiver, + verify, + &mut lines, + ) + .await? + } + println!("{lines}") + } + + anyhow::Ok(()) + }), + ) + }; + + let (portal_session, session) = run_nitro_client; + 'select: { + tokio::select! { + result = session => break 'select result??, + result = portal_session => result??, + } + anyhow::bail!("unreachable") + } + Ok(()) +} + +async fn bench_session + Clone + Send + Sync + 'static>( + size: usize, + num_merged: usize, + update_sender: &UnboundedSender>, + update_ok_receiver: &mut UnboundedReceiver>, + verify: impl Fn(C) -> anyhow::Result<()>, + lines: &mut String, +) -> anyhow::Result<()> +where + C::Error: Into, +{ + let clock = + C::try_from(OrdinaryClock((0..size).map(|i| (i as _, 0)).collect())).map_err(Into::into)?; + let start = Instant::now(); + update_sender.send(Update(clock, Default::default(), 0))?; + let Some((_, clock, elapsed)) = update_ok_receiver.recv().await else { + anyhow::bail!("missing UpdateOk") + }; + let net_round = start.elapsed(); + println!( + "{size}, {num_merged}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}", + elapsed[0], elapsed[1], elapsed[2], elapsed[3], elapsed[4], net_round + ); + + for _ in 0..5 { + sleep(Duration::from_millis(100)).await; + let update = Update(clock.clone(), vec![clock.clone(); num_merged], 0); + let start = Instant::now(); + update_sender.send(update)?; + let Some((_, clock, elapsed_in_tee)) = update_ok_receiver.recv().await else { + anyhow::bail!("missing UpdateOk") + }; + let elapsed = start.elapsed(); + // eprintln!("{size:8} {num_merged:3} {elapsed:?}"); + println!( + "{size}, {num_merged}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}", + elapsed_in_tee[0], elapsed_in_tee[1], elapsed_in_tee[2], elapsed_in_tee[3], elapsed_in_tee[4], elapsed + ); + writeln!(lines, "{size},{num_merged},{}ms", elapsed.as_millis())?; + verify(clock)? + } + Ok(()) +} + +async fn stress_bench_session + Clone + Send + Sync + 'static>( + size: usize, + num_merged: usize, + num_concurrent: usize, + update_sender: &UnboundedSender>, + update_ok_receiver: &mut UnboundedReceiver>, + lines: &mut String, +) -> anyhow::Result<()> +where + C::Error: Into, +{ + let clock = + C::try_from(OrdinaryClock((0..size).map(|i| (i as _, 0)).collect())).map_err(Into::into)?; + for i in 0..num_concurrent { + update_sender.send(Update(clock.clone(), Default::default(), i as _))?; + } + let mut count = 0; + let close_loops_session = async { + while let Some((id, clock, _elapsed)) = update_ok_receiver.recv().await { + count += 1; + let update = Update(clock.clone(), vec![clock.clone(); num_merged], id); + update_sender.send(update)? + } + anyhow::Ok(()) + }; + match timeout(Duration::from_secs(10), close_loops_session).await { + Err(_) => {} + Ok(result) => { + result?; + anyhow::bail!("unreachable") + } + } + println!( + "key {size},merged {num_merged},counts {count}, tps {}", + count as f32 / 10. + ); + writeln!( + lines, + "{size},{num_merged},{count},{}", + count as f32 / 10. + )?; + Ok(()) +} diff --git a/demos/tee_vlc/src/bin/run-solo-vlc-enclave.rs b/demos/tee_vlc/src/bin/run-solo-vlc-enclave.rs new file mode 100644 index 0000000..5f4b698 --- /dev/null +++ b/demos/tee_vlc/src/bin/run-solo-vlc-enclave.rs @@ -0,0 +1,71 @@ +use tokio::process::Command; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + // the path of dockerfile and app.eif putted path + let item = std::env::args().nth(1); + println!("* Install Nitro CLI"); + let status = Command::new("sh") + .arg("-c") + .arg( + String::from("sudo dnf install -y tmux htop openssl-devel perl docker-24.0.5-1.amzn2023.0.3 aws-nitro-enclaves-cli aws-nitro-enclaves-cli-devel") + + " && sudo usermod -aG ne ec2-user" + + " && sudo usermod -aG docker ec2-user" + + " && sudo systemctl restart docker" + + " && sudo systemctl restart nitro-enclaves-allocator.service" + + " && sudo systemctl enable --now nitro-enclaves-allocator.service" + + " && sudo systemctl enable --now docker" + ) + .status() + .await?; + anyhow::ensure!(status.success()); + + println!("* Build artifact"); + let status = Command::new("cargo") + .args([ + "build", + "--target", + "x86_64-unknown-linux-musl", + "--profile", + "artifact", + "--features", + "nitro-enclaves,tikv-jemallocator", + "--bin", + "tee_vlc", + ]) + .status() + .await?; + anyhow::ensure!(status.success()); + + println!("* cp artifact"); + let status = Command::new("cp") + .arg("target/x86_64-unknown-linux-musl/artifact/tee_vlc") + .arg(item.clone().ok_or(anyhow::format_err!("missing destination path"))?) + .status() + .await?; + anyhow::ensure!(status.success()); + + println!("* cd docker folder and build enclave image file"); + let status = Command::new("sh") + .arg("-c") + .arg(format!( + "cd {} && docker build . -t tee_vlc && nitro-cli build-enclave --docker-uri tee_vlc:latest --output-file tee_vlc.eif", + item.clone().ok_or(anyhow::format_err!("missing destination path"))? + )) + .status() + .await?; + anyhow::ensure!(status.success()); + + println!("* cd dockerfile folder and run enclave image"); + let status = Command::new("sh") + .arg("-c") + .arg(format!( + "cd {} && nitro-cli run-enclave --cpu-count 2 --memory 2048 --enclave-cid 16 --eif-path tee_vlc.eif", + item.ok_or(anyhow::format_err!("missing destination path"))? + )) + .status() + .await?; + anyhow::ensure!(status.success()); + + Ok(()) +} diff --git a/demos/tee_vlc/src/lib.rs b/demos/tee_vlc/src/lib.rs new file mode 100644 index 0000000..011a655 --- /dev/null +++ b/demos/tee_vlc/src/lib.rs @@ -0,0 +1,13 @@ +pub mod nitro_clock; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Clocked { + pub clock: C, + pub inner: M, +} + +pub trait Verify: Send + Sync + 'static { + fn verify_clock(&self, num_faulty: usize, state: &S) -> anyhow::Result<()>; +} \ No newline at end of file diff --git a/demos/tee_vlc/src/main.rs b/demos/tee_vlc/src/main.rs new file mode 100644 index 0000000..d4700ff --- /dev/null +++ b/demos/tee_vlc/src/main.rs @@ -0,0 +1,7 @@ +use tee_vlc::nitro_clock::NitroEnclavesClock; + +#[tokio::main] +#[cfg(feature = "nitro-enclaves")] +async fn main() -> anyhow::Result<()> { + NitroEnclavesClock::run(5006).await +} \ No newline at end of file diff --git a/demos/tee_vlc/src/nitro_clock.rs b/demos/tee_vlc/src/nitro_clock.rs new file mode 100644 index 0000000..94e0122 --- /dev/null +++ b/demos/tee_vlc/src/nitro_clock.rs @@ -0,0 +1,292 @@ +use std::{sync::Arc, time::Duration}; +use bincode::Options; +use types::raw_wrapper::Payload; +use crypto::core::DigestHash; +use enclaves::nitro_secure::{HandleFn, NitroSecureModule as NitroSecure}; +use vlc::ordinary_clock::{Clock, LamportClock, OrdinaryClock}; +use derive_where::derive_where; +use serde::{Deserialize, Serialize}; +use tokio::{sync::mpsc::{UnboundedReceiver, UnboundedSender}, time::Instant}; +use tracing::*; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Update(pub C, pub Vec, pub u64); + +// feel lazy to define event type for replying +pub type UpdateOk = (u64, C, Vec); + +#[derive(Debug, Clone, Default, derive_more::AsRef, Serialize, Deserialize)] +#[derive_where(PartialOrd, PartialEq)] +pub struct NitroEnclavesClock { + #[as_ref] + pub plain: OrdinaryClock, + #[derive_where(skip)] + pub document: Payload, +} + +impl TryFrom for NitroEnclavesClock { + type Error = anyhow::Error; + + fn try_from(value: OrdinaryClock) -> Result { + anyhow::ensure!(value.is_genesis()); + Ok(Self { + plain: value, + document: Default::default(), + }) + } +} + +impl Clock for NitroEnclavesClock { + fn reduce(&self) -> LamportClock { + self.plain.reduce() + } +} + +// technically `feature = "aws-nitro-enclaves-attestation"` is sufficient for +// attestation, NSM API is only depended by `NitroSecureModule` that running +// inside enclaves image +#[cfg(feature = "nitro-enclaves")] +impl NitroEnclavesClock { + pub fn verify( + &self, + ) -> anyhow::Result> { + if self.plain.is_genesis() { + return Ok(None); + } + use aws_nitro_enclaves_attestation::{AttestationProcess as _, AWS_ROOT_CERT}; + use aws_nitro_enclaves_nsm_api::api::AttestationDoc; + let document = AttestationDoc::from_bytes( + &self.document, + AWS_ROOT_CERT, + std::time::SystemTime::UNIX_EPOCH + .elapsed() + .unwrap() + .as_secs(), + )?; + use DigestHash as _; + anyhow::ensure!( + document.user_data.as_ref().map(|user_data| &***user_data) + == Some(&self.plain.sha256().to_fixed_bytes()[..]) + ); + Ok(Some(document)) + } + + pub fn worker() -> HandleFn { + Arc::new(|buf, nsm, pcrs, write_sender| { + Box::pin(async move { + // IO action in tee is severe delay, just debug + // println!("Received buffer: {:?}", buf); + // let _ = io::stdout().flush(); + + // if production env, need to remove time slot log + let mut timers = Vec::new(); + if let Err(err) = async { + // 0. once action time + let full_start = Instant::now(); + + // 1. decode time + let start = Instant::now(); + let Update(prev, merged, id) = bincode::options() + .deserialize::>(&buf)?; + + let elapsed = start.elapsed(); + timers.push(elapsed); + // println!("bincode deserialize: {:?}", elapsed); + // let _ = io::stdout().flush(); + + // 2. verify clocks time + let start = Instant::now(); + for clock in [&prev].into_iter().chain(&merged) { + if let Some(document) = clock.verify()? { + for (i, pcr) in pcrs.iter().enumerate() { + anyhow::ensure!( + document.pcrs.get(&i).map(|pcr| &**pcr) == Some(pcr) + ) + } + } + } + + let elapsed = start.elapsed(); + timers.push(elapsed); + // println!("verify clock: {:?}", elapsed); + // let _ = io::stdout().flush(); + + // 3. update clock time + let start = Instant::now(); + let plain = prev + .plain + .update(merged.iter().map(|clock| &clock.plain), id); + + let elapsed = start.elapsed(); + timers.push(elapsed); + // println!("Update clock: {:?}", elapsed); + // let _ = io::stdout().flush(); + + // 4. gen clock with proof time + let start = Instant::now(); + // let key_lens = plain.0.len(); + // relies on the fact that different clocks always hash into different + // digests, hopefully true + let user_data = plain.sha256().to_fixed_bytes().to_vec(); + let document = nsm.process_attestation(user_data)?; + let updated = NitroEnclavesClock { + plain, + document: Payload(document), + }; + + let elapsed = start.elapsed(); + timers.push(elapsed); + // println!("Gen clock proof: {:?}", elapsed); + // let _ = io::stdout().flush(); + + let elapsed = full_start.elapsed(); + timers.push(elapsed); + // println!("Total once time: {:?}, key is {:?}", elapsed, key_lens); + // let _ = io::stdout().flush(); + + let buf = bincode::options().serialize(&(id, updated, timers))?; + write_sender.send(buf)?; + Ok(()) + } + .await + { + warn!("{err}") + } + Ok(()) + }) + }) + } + + pub async fn run(port: u32) -> anyhow::Result<()> { + let handler: HandleFn = NitroEnclavesClock::worker(); + + NitroSecure::run(port, handler).await + } +} + + +pub async fn nitro_enclaves_portal_session( + cid: u32, + port: u32, + mut events: UnboundedReceiver>, + sender: UnboundedSender>, +) -> anyhow::Result<()> { + use std::os::fd::AsRawFd; + + use bincode::Options; + use nix::sys::socket::{connect, socket, AddressFamily, SockFlag, SockType, VsockAddr}; + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + + let fd = socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::empty(), + None, + )?; + // this one is blocking, but should be instant, hopefully + { + let _span = tracing::debug_span!("connect").entered(); + connect(fd.as_raw_fd(), &VsockAddr::new(cid, port))? + } + let stream = std::os::unix::net::UnixStream::from(fd); + stream.set_nonblocking(true)?; + let stream = tokio::net::UnixStream::from_std(stream)?; + let (mut read_half, mut write_half) = stream.into_split(); + let write_session = tokio::spawn(async move { + while let Some(update) = events.recv().await { + let buf = bincode::options().serialize(&update)?; + write_half.write_u64_le(buf.len() as _).await?; + write_half.write_all(&buf).await? + } + anyhow::Ok(()) + }); + let read_session = tokio::spawn(async move { + loop { + let len = read_half.read_u64_le().await?; + let mut buf = vec![0; len as _]; + read_half.read_exact(&mut buf).await?; + sender.send(bincode::options().deserialize(&buf)?)? + } + #[allow(unreachable_code)] // for type hinting + anyhow::Ok(()) + }); + tokio::select! { + result = write_session => return result?, + result = read_session => result?? + } + anyhow::bail!("unreachable") +} + +#[cfg(feature = "nitro-enclaves")] +pub mod impls { + + use super::NitroEnclavesClock; + use crate::{Clocked, Verify}; + + impl Verify<()> for Clocked { + fn verify_clock(&self, _: usize, (): &()) -> anyhow::Result<()> { + self.clock.verify()?; + Ok(()) + } + } +} + +pub fn try_connection(cid: u32, port: u32) -> anyhow::Result { + use nix::sys::socket::{connect, socket, AddressFamily, SockFlag, SockType, VsockAddr}; + use std::os::fd::AsRawFd; + + let fd = socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::empty(), + None, + )?; + + { + let _span = tracing::debug_span!("connect").entered(); + connect(fd.as_raw_fd(), &VsockAddr::new(cid, port))? + } + + let stream = std::os::unix::net::UnixStream::from(fd); + stream.set_nonblocking(true)?; + + let stream = tokio::net::UnixStream::from_std(stream)?; + Ok(stream) +} + +pub async fn tee_start_listening( + stream: tokio::net::UnixStream, + mut events: UnboundedReceiver>, + sender: UnboundedSender>, +) -> anyhow::Result<()> { + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + + let (mut read_half, mut write_half) = stream.into_split(); + + let write_session = tokio::spawn(async move { + while let Some(prompt) = events.recv().await { + let buf = bincode::options().serialize(&prompt)?; + write_half.write_u64_le(buf.len() as _).await?; + write_half.write_all(&buf).await?; + } + anyhow::Ok(()) + }); + + let read_session = tokio::spawn(async move { + loop { + let len = read_half.read_u64_le().await?; + let mut buf = vec![0; len as _]; + read_half.read_exact(&mut buf).await?; + sender.send(bincode::options().deserialize(&buf)?)? + } + #[allow(unreachable_code)] // for type hinting + anyhow::Ok(()) + }); + + tokio::select! { + result = write_session => return result?, + result = read_session => result?? + } + + anyhow::bail!("unreachable") +} diff --git a/demos/vlc-dag/Cargo.toml b/demos/vlc_dag/Cargo.toml similarity index 96% rename from demos/vlc-dag/Cargo.toml rename to demos/vlc_dag/Cargo.toml index 14d895f..326327a 100644 --- a/demos/vlc-dag/Cargo.toml +++ b/demos/vlc_dag/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "vlc-dag" +name = "vlc_dag" version = "0.1.0" edition = "2021" diff --git a/demos/vlc-dag/src/db_client/lldb_client.rs b/demos/vlc_dag/src/db_client/lldb_client.rs similarity index 100% rename from demos/vlc-dag/src/db_client/lldb_client.rs rename to demos/vlc_dag/src/db_client/lldb_client.rs diff --git a/demos/vlc-dag/src/db_client/lldb_test.rs b/demos/vlc_dag/src/db_client/lldb_test.rs similarity index 100% rename from demos/vlc-dag/src/db_client/lldb_test.rs rename to demos/vlc_dag/src/db_client/lldb_test.rs diff --git a/demos/vlc-dag/src/db_client/mod.rs b/demos/vlc_dag/src/db_client/mod.rs similarity index 100% rename from demos/vlc-dag/src/db_client/mod.rs rename to demos/vlc_dag/src/db_client/mod.rs diff --git a/demos/vlc-dag/src/lib.rs b/demos/vlc_dag/src/lib.rs similarity index 100% rename from demos/vlc-dag/src/lib.rs rename to demos/vlc_dag/src/lib.rs