diff --git a/cas_client/src/interface.rs b/cas_client/src/interface.rs index 3250175b..3a50a613 100644 --- a/cas_client/src/interface.rs +++ b/cas_client/src/interface.rs @@ -35,7 +35,7 @@ pub trait UploadClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()>; + ) -> Result; /// Check if a XORB already exists. async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result; diff --git a/cas_client/src/local_client.rs b/cas_client/src/local_client.rs index 1aa26928..5ac4ebfa 100644 --- a/cas_client/src/local_client.rs +++ b/cas_client/src/local_client.rs @@ -105,7 +105,7 @@ impl UploadClient for LocalClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()> { + ) -> Result { // no empty writes if chunk_and_boundaries.is_empty() || data.is_empty() { return Err(CasClientError::InvalidArguments); @@ -120,7 +120,7 @@ impl UploadClient for LocalClient { if self.exists(prefix, hash).await? { info!("{prefix:?}/{hash:?} already exists in Local CAS; returning."); - return Ok(()); + return Ok(0); } let file_path = self.get_path_for_entry(prefix, hash); @@ -163,7 +163,7 @@ impl UploadClient for LocalClient { info!("{file_path:?} successfully written with {total_bytes_written:?} bytes."); - Ok(()) + Ok(total_bytes_written) } async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result { diff --git a/cas_client/src/remote_client.rs b/cas_client/src/remote_client.rs index d362c41a..93ce9938 100644 --- a/cas_client/src/remote_client.rs +++ b/cas_client/src/remote_client.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use cas_object::CasObject; +use cas_object::{CasObject, CompressionScheme}; use cas_types::{ BatchQueryReconstructionResponse, CASReconstructionFetchInfo, CASReconstructionTerm, FileRange, HexMerkleHash, HttpRange, Key, QueryReconstructionResponse, UploadXorbResponse, @@ -38,6 +38,8 @@ type RangeDownloadSingleFlight = Arc, Vec), CasClientError>> pub struct RemoteClient { endpoint: String, + compression: CompressionScheme, + dry_run: bool, http_auth_client: ClientWithMiddleware, chunk_cache: Option>, threadpool: Arc, @@ -48,8 +50,10 @@ impl RemoteClient { pub fn new( threadpool: Arc, endpoint: &str, + compression: CompressionScheme, auth: &Option, cache_config: &Option, + dry_run: bool, ) -> Self { // use disk cache if cache_config provided. let chunk_cache = if let Some(cache_config) = cache_config { @@ -67,6 +71,8 @@ impl RemoteClient { Self { endpoint: endpoint.to_string(), + compression, + dry_run, http_auth_client: http_client::build_auth_http_client(auth, &None).unwrap(), chunk_cache, threadpool, @@ -83,13 +89,13 @@ impl UploadClient for RemoteClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<()> { + ) -> Result { let key = Key { prefix: prefix.to_string(), hash: *hash, }; - let was_uploaded = self.upload(&key, data, chunk_and_boundaries).await?; + let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries).await?; if !was_uploaded { debug!("{key:?} not inserted into CAS."); @@ -97,7 +103,7 @@ impl UploadClient for RemoteClient { debug!("{key:?} inserted into CAS."); } - Ok(()) + Ok(nbytes_trans) } async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result { @@ -245,18 +251,13 @@ impl RemoteClient { key: &Key, contents: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result { + ) -> Result<(bool, usize)> { let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?; let mut writer = Cursor::new(Vec::new()); - let (_, _) = CasObject::serialize( - &mut writer, - &key.hash, - &contents, - &chunk_and_boundaries, - cas_object::CompressionScheme::LZ4, - )?; + let (_, nbytes_trans) = + CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, self.compression)?; // free memory before the "slow" network transfer below drop(contents); @@ -264,10 +265,14 @@ impl RemoteClient { writer.set_position(0); let data = writer.into_inner(); - let response = self.http_auth_client.post(url).body(data).send().await?; - let response_parsed: UploadXorbResponse = response.json().await?; + if !self.dry_run { + let response = self.http_auth_client.post(url).body(data).send().await?; + let response_parsed: UploadXorbResponse = response.json().await?; - Ok(response_parsed.was_inserted) + Ok((response_parsed.was_inserted, nbytes_trans)) + } else { + Ok((true, nbytes_trans)) + } } /// use the reconstruction response from CAS to re-create the described file for any calls @@ -519,7 +524,7 @@ mod tests { build_cas_object(3, ChunkSize::Random(512, 10248), cas_object::CompressionScheme::LZ4); let threadpool = Arc::new(ThreadPool::new().unwrap()); - let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, &None, &None); + let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, CompressionScheme::LZ4, &None, &None, false); // Act let result = threadpool .external_run_async_task(async move { client.put(prefix, &c.info.cashash, data, chunk_boundaries).await }) @@ -624,6 +629,8 @@ mod tests { chunk_cache: Some(Arc::new(chunk_cache)), http_auth_client: http_client.clone(), endpoint: "".to_string(), + compression: CompressionScheme::LZ4, + dry_run: false, threadpool: threadpool.clone(), range_download_single_flight: Arc::new(Group::new(threadpool.clone())), }; diff --git a/data/src/bin/example.rs b/data/src/bin/example.rs index 434ed751..8d1bd9eb 100644 --- a/data/src/bin/example.rs +++ b/data/src/bin/example.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, OnceLock}; use anyhow::Result; use cas_client::CacheConfig; +use cas_object::CompressionScheme; use clap::{Args, Parser, Subcommand}; use data::configurations::*; use data::{PointerFile, PointerFileTranslator}; @@ -81,6 +82,7 @@ fn default_clean_config() -> Result { file_query_policy: Default::default(), cas_storage_config: StorageConfig { endpoint: Endpoint::FileSystem(path.join("xorbs")), + compression: CompressionScheme::LZ4, auth: None, prefix: "default".into(), cache_config: Some(CacheConfig { @@ -91,6 +93,7 @@ fn default_clean_config() -> Result { }, shard_storage_config: StorageConfig { endpoint: Endpoint::FileSystem(path.join("xorbs")), + compression: Default::default(), auth: None, prefix: "default-merkledb".into(), cache_config: Some(CacheConfig { @@ -121,6 +124,7 @@ fn default_smudge_config() -> Result { file_query_policy: Default::default(), cas_storage_config: StorageConfig { endpoint: Endpoint::FileSystem(path.join("xorbs")), + compression: CompressionScheme::LZ4, auth: None, prefix: "default".into(), cache_config: Some(CacheConfig { @@ -131,6 +135,7 @@ fn default_smudge_config() -> Result { }, shard_storage_config: StorageConfig { endpoint: Endpoint::FileSystem(path.join("xorbs")), + compression: Default::default(), auth: None, prefix: "default-merkledb".into(), cache_config: Some(CacheConfig { diff --git a/data/src/bin/xtool.rs b/data/src/bin/xtool.rs index 0680612d..b0ebc97d 100644 --- a/data/src/bin/xtool.rs +++ b/data/src/bin/xtool.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use anyhow::Result; use cas_client::build_http_client; +use cas_object::CompressionScheme; use clap::{Args, Parser, Subcommand}; use data::data_client::{clean_file, default_config}; use data::errors::DataProcessingError; @@ -152,6 +153,12 @@ struct DedupArg { /// to the file; otherwise write out to the stdout. #[clap(short, long)] output: Option, + /// The compression scheme to use on XORB upload. Choices are + /// 0: no compression; + /// 1: LZ4 compression; + /// 2: 4 byte groups with LZ4 compression; + #[clap(short, long)] + compression: u8, } #[derive(Args)] @@ -164,8 +171,15 @@ impl Command { async fn run(self, hub_client: HubClient, threadpool: Arc) -> Result<()> { match self { Command::Dedup(arg) => { - let (all_file_info, clean_ret) = - dedup_files(arg.files, arg.recursive, arg.sequential, hub_client, threadpool).await?; + let (all_file_info, clean_ret, total_bytes_trans) = dedup_files( + arg.files, + arg.recursive, + arg.sequential, + hub_client, + threadpool, + arg.compression.try_into()?, + ) + .await?; let mut writer: Box = if let Some(path) = arg.output { Box::new(BufWriter::new(File::options().create(true).write(true).truncate(true).open(path)?)) } else { @@ -179,6 +193,8 @@ impl Command { println!("{}: {} bytes -> {} bytes", pf.hash_string(), pf.filesize(), new_bytes); } + eprintln!("Transmitted {total_bytes_trans} bytes in total."); + Ok(()) }, Command::Query(arg) => query_file(arg.hash, hub_client, threadpool), @@ -201,7 +217,8 @@ async fn dedup_files( sequential: bool, hub_client: HubClient, threadpool: Arc, -) -> Result<(Vec, Vec<(PointerFile, u64)>)> { + compression: CompressionScheme, +) -> Result<(Vec, Vec<(PointerFile, u64)>, u64)> { let token_type = "write"; let (endpoint, jwt_token, jwt_token_expiry) = hub_client.get_jwt_token(token_type).await?; let token_refresher = Arc::new(HubClientTokenRefresher { @@ -209,7 +226,9 @@ async fn dedup_files( token_type: token_type.to_owned(), client: Arc::new(hub_client), }) as Arc; - let (config, _tempdir) = default_config(endpoint, Some((jwt_token, jwt_token_expiry)), Some(token_refresher))?; + eprintln!("Using {compression} compression"); + let (config, _tempdir) = + default_config(endpoint, Some(compression), Some((jwt_token, jwt_token_expiry)), Some(token_refresher))?; let num_workers = if sequential { 1 } else { threadpool.num_worker_threads() }; let processor = Arc::new(PointerFileTranslator::dry_run(config, threadpool, None, false).await?); @@ -248,11 +267,11 @@ async fn dedup_files( ParallelError::TaskError(e) => e, })?; - processor.finalize_cleaning().await?; + let total_bytes_trans = processor.finalize_cleaning().await?; let all_file_info = processor.summarize_file_info_of_session().await?; - Ok((all_file_info, clean_ret)) + Ok((all_file_info, clean_ret, total_bytes_trans)) } fn is_git_special_files(path: &str) -> bool { diff --git a/data/src/cas_interface.rs b/data/src/cas_interface.rs index fb279a81..c7ccce66 100644 --- a/data/src/cas_interface.rs +++ b/data/src/cas_interface.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub use cas_client::Client; use cas_client::{CacheConfig, RemoteClient}; +use cas_object::CompressionScheme; use mdb_shard::ShardFileManager; use tracing::info; use utils::auth::AuthConfig; @@ -18,23 +19,31 @@ pub(crate) fn create_cas_client( _maybe_repo_info: &Option, shard_manager: Arc, threadpool: Arc, + dry_run: bool, ) -> Result> { match cas_storage_config.endpoint { - Endpoint::Server(ref endpoint) => { - remote_client(endpoint, &cas_storage_config.cache_config, &cas_storage_config.auth, threadpool) - }, + Endpoint::Server(ref endpoint) => remote_client( + endpoint, + cas_storage_config.compression, + &cas_storage_config.cache_config, + &cas_storage_config.auth, + threadpool, + dry_run, + ), Endpoint::FileSystem(ref path) => local_test_cas_client(&cas_storage_config.prefix, path, shard_manager), } } fn remote_client( endpoint: &str, + compression: CompressionScheme, cache_config: &Option, auth: &Option, threadpool: Arc, + dry_run: bool, ) -> Result> { // Raw remote client. - let remote_client = RemoteClient::new(threadpool, endpoint, auth, cache_config); + let remote_client = RemoteClient::new(threadpool, endpoint, compression, auth, cache_config, dry_run); Ok(Arc::new(remote_client)) } diff --git a/data/src/configurations.rs b/data/src/configurations.rs index 89575a4b..a79d59fd 100644 --- a/data/src/configurations.rs +++ b/data/src/configurations.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use std::str::FromStr; use cas_client::CacheConfig; +use cas_object::CompressionScheme; use utils::auth::AuthConfig; use crate::errors::Result; @@ -16,6 +17,7 @@ pub enum Endpoint { #[derive(Debug)] pub struct StorageConfig { pub endpoint: Endpoint, + pub compression: CompressionScheme, pub auth: Option, pub prefix: String, pub cache_config: Option, diff --git a/data/src/data_client.rs b/data/src/data_client.rs index 5e08723c..00149c61 100644 --- a/data/src/data_client.rs +++ b/data/src/data_client.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::sync::Arc; use cas_client::CacheConfig; +use cas_object::CompressionScheme; use dirs::home_dir; use lazy_static::lazy_static; use merkledb::constants::IDEAL_CAS_BLOCK_SIZE; @@ -30,9 +31,11 @@ const MAX_CONCURRENT_DOWNLOADS: usize = 8; // Download is not CPU-bound const DEFAULT_CAS_ENDPOINT: &str = "http://localhost:8080"; const READ_BLOCK_SIZE: usize = 1024 * 1024; +const UPLOAD_XORB_COMPRESSION: CompressionScheme = CompressionScheme::LZ4; pub fn default_config( endpoint: String, + xorb_compression: Option, token_info: Option<(String, u64)>, token_refresher: Option>, ) -> errors::Result<(TranslatorConfig, TempDir)> { @@ -53,6 +56,7 @@ pub fn default_config( file_query_policy: FileQueryPolicy::ServerOnly, cas_storage_config: StorageConfig { endpoint: Endpoint::Server(endpoint.clone()), + compression: xorb_compression.unwrap_or_default(), auth: auth_cfg.clone(), prefix: "default".into(), cache_config: Some(CacheConfig { @@ -63,6 +67,7 @@ pub fn default_config( }, shard_storage_config: StorageConfig { endpoint: Endpoint::Server(endpoint), + compression: Default::default(), auth: auth_cfg, prefix: "default-merkledb".into(), cache_config: Some(CacheConfig { @@ -98,8 +103,12 @@ pub async fn upload_async( // produce Xorbs + Shards // upload shards and xorbs // for each file, return the filehash - let (config, _tempdir) = - default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), token_info, token_refresher)?; + let (config, _tempdir) = default_config( + endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), + Some(UPLOAD_XORB_COMPRESSION), + token_info, + token_refresher, + )?; let processor = Arc::new(PointerFileTranslator::new(config, threadpool, progress_updater, false).await?); @@ -136,7 +145,7 @@ pub async fn download_async( } } let (config, _tempdir) = - default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), token_info, token_refresher)?; + default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), None, token_info, token_refresher)?; let updaters = match progress_updaters { None => vec![None; pointer_files.len()], diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 9677bdd8..be512402 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -114,6 +114,7 @@ impl PointerFileTranslator { &config.repo_info, shard_manager.clone(), threadpool.clone(), + dry_run, )?; let remote_shards = { @@ -140,7 +141,6 @@ impl PointerFileTranslator { let xorb_uploader = ParallelXorbUploader::new( &config.cas_storage_config.prefix, - dry_run, shard_manager.clone(), cas_client.clone(), XORB_UPLOAD_RATE_LIMITER.clone(), @@ -210,7 +210,7 @@ impl PointerFileTranslator { .await } - pub async fn finalize_cleaning(&self) -> Result<()> { + pub async fn finalize_cleaning(&self) -> Result { // flush accumulated CAS data. let mut cas_data_accumulator = self.global_cas_data.lock().await; let new_cas_data = take(cas_data_accumulator.deref_mut()); @@ -220,7 +220,7 @@ impl PointerFileTranslator { self.xorb_uploader.register_new_cas_block(new_cas_data).await?; } - self.xorb_uploader.flush().await?; + let total_bytes_trans = self.xorb_uploader.flush().await?; // flush accumulated memory shard. self.shard_manager.flush().await?; @@ -229,7 +229,7 @@ impl PointerFileTranslator { self.upload_shards().await?; } - Ok(()) + Ok(total_bytes_trans) } async fn upload_shards(&self) -> Result<()> { diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 7d3c81e9..2dc9a3b9 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -19,8 +20,9 @@ use crate::errors::*; pub(crate) trait XorbUpload { /// Register a block of data ready for upload and dedup, return the hash of the produced xorb. async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result; - /// Flush all xorbs that are pending to be sent to remote. - async fn flush(&self) -> Result<()>; + /// Flush all xorbs that are pending to be sent to remote. Return the total number of bytes + /// put on network link. + async fn flush(&self) -> Result; } type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); @@ -34,14 +36,13 @@ type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); pub(crate) struct ParallelXorbUploader { // Configurations cas_prefix: String, - dry_run: bool, // Utils shard_manager: Arc, cas: Arc, // Internal worker - upload_tasks: Mutex>>, + upload_tasks: Mutex>>, // Rate limiter rate_limiter: Arc, @@ -51,12 +52,14 @@ pub(crate) struct ParallelXorbUploader { // Upload Progress upload_progress_updater: Option>, + + // Metrics + total_bytes_trans: AtomicU64, } impl ParallelXorbUploader { pub async fn new( cas_prefix: &str, - dry_run: bool, shard_manager: Arc, cas: Arc, rate_limiter: Arc, @@ -65,20 +68,20 @@ impl ParallelXorbUploader { ) -> Arc { Arc::new(ParallelXorbUploader { cas_prefix: cas_prefix.to_owned(), - dry_run, shard_manager, cas, upload_tasks: Mutex::new(JoinSet::new()), rate_limiter, threadpool, upload_progress_updater, + total_bytes_trans: 0.into(), }) } async fn status_is_ok(&self) -> Result<()> { let mut upload_tasks = self.upload_tasks.lock().await; while let Some(result) = upload_tasks.try_join_next() { - result??; + self.total_bytes_trans.fetch_add(result?? as u64, Ordering::Relaxed); } Ok(()) @@ -110,10 +113,9 @@ impl XorbUpload for ParallelXorbUploader { let mut upload_tasks = self.upload_tasks.lock().await; let upload_progress_updater = self.upload_progress_updater.clone(); - let dry_run = self.dry_run; upload_tasks.spawn_on( async move { - let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix, dry_run).await; + let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await; if let Some(updater) = upload_progress_updater { updater.update(xorb_data_len as u64); } @@ -139,14 +141,14 @@ impl XorbUpload for ParallelXorbUploader { /// Flush makes sure all xorbs added to queue before this call are sent successfully /// to remote. This function can be called multiple times and should be called at /// least once before `ParallelXorbUploader` is dropped. - async fn flush(&self) -> Result<()> { + async fn flush(&self) -> Result { let mut upload_tasks = self.upload_tasks.lock().await; while let Some(result) = upload_tasks.join_next().await { - result??; + self.total_bytes_trans.fetch_add(result?? as u64, Ordering::Relaxed); } - Ok(()) + Ok(self.total_bytes_trans.load(Ordering::Relaxed)) } } @@ -155,13 +157,12 @@ async fn upload_and_register_xorb( shard_manager: Arc, cas: Arc, cas_prefix: String, - dry_run: bool, -) -> Result<()> { +) -> Result { let (cas_hash, data, chunks) = item; let raw_bytes_len = data.len(); // upload xorb - if !dry_run { + let nbytes_trans = { let mut pos = 0; let chunk_and_boundaries = chunks .iter() @@ -170,8 +171,8 @@ async fn upload_and_register_xorb( (*hash, pos as u32) }) .collect(); - cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; - } + cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await? + }; // register for dedup // This should happen after uploading xorb above succeeded so not to @@ -193,5 +194,5 @@ async fn upload_and_register_xorb( shard_manager.add_cas_block(cas_info).await?; } - Ok(()) + Ok(nbytes_trans) } diff --git a/data/src/test_utils/local_test_client.rs b/data/src/test_utils/local_test_client.rs index a1d6aaba..078d4e1c 100644 --- a/data/src/test_utils/local_test_client.rs +++ b/data/src/test_utils/local_test_client.rs @@ -41,7 +41,7 @@ impl UploadClient for LocalTestClient { hash: &MerkleHash, data: Vec, chunk_and_boundaries: Vec<(MerkleHash, u32)>, - ) -> Result<(), CasClientError> { + ) -> Result { self.cas.put(prefix, hash, data, chunk_and_boundaries).await }