Skip to content

Commit

Permalink
add feature to compare compression algorithms
Browse files Browse the repository at this point in the history
  • Loading branch information
seanses committed Jan 28, 2025
1 parent 22a2620 commit 22f0a77
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cas_client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait UploadClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()>;
) -> Result<usize>;

/// Check if a XORB already exists.
async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool>;
Expand Down
6 changes: 3 additions & 3 deletions cas_client/src/local_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl UploadClient for LocalClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()> {
) -> Result<usize> {
// no empty writes
if chunk_and_boundaries.is_empty() || data.is_empty() {
return Err(CasClientError::InvalidArguments);
Expand All @@ -120,7 +120,7 @@ impl UploadClient for LocalClient {

if self.exists(prefix, hash).await? {
info!("{prefix:?}/{hash:?} already exists in Local CAS; returning.");
return Ok(());
return Ok(0);
}

let file_path = self.get_path_for_entry(prefix, hash);
Expand Down Expand Up @@ -163,7 +163,7 @@ impl UploadClient for LocalClient {

info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");

Ok(())
Ok(total_bytes_written)
}

async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool> {
Expand Down
39 changes: 23 additions & 16 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use cas_object::CasObject;
use cas_object::{CasObject, CompressionScheme};
use cas_types::{
BatchQueryReconstructionResponse, CASReconstructionFetchInfo, CASReconstructionTerm, FileRange, HexMerkleHash,
HttpRange, Key, QueryReconstructionResponse, UploadXorbResponse,
Expand Down Expand Up @@ -38,6 +38,8 @@ type RangeDownloadSingleFlight = Arc<Group<(Vec<u8>, Vec<u32>), CasClientError>>

pub struct RemoteClient {
endpoint: String,
compression: CompressionScheme,
dry_run: bool,
http_auth_client: ClientWithMiddleware,
chunk_cache: Option<Arc<dyn ChunkCache>>,
threadpool: Arc<ThreadPool>,
Expand All @@ -48,8 +50,10 @@ impl RemoteClient {
pub fn new(
threadpool: Arc<ThreadPool>,
endpoint: &str,
compression: CompressionScheme,
auth: &Option<AuthConfig>,
cache_config: &Option<CacheConfig>,
dry_run: bool,
) -> Self {
// use disk cache if cache_config provided.
let chunk_cache = if let Some(cache_config) = cache_config {
Expand All @@ -67,6 +71,8 @@ impl RemoteClient {

Self {
endpoint: endpoint.to_string(),
compression,
dry_run,
http_auth_client: http_client::build_auth_http_client(auth, &None).unwrap(),
chunk_cache,
threadpool,
Expand All @@ -83,21 +89,21 @@ impl UploadClient for RemoteClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()> {
) -> Result<usize> {
let key = Key {
prefix: prefix.to_string(),
hash: *hash,
};

let was_uploaded = self.upload(&key, data, chunk_and_boundaries).await?;
let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries).await?;

if !was_uploaded {
debug!("{key:?} not inserted into CAS.");
} else {
debug!("{key:?} inserted into CAS.");
}

Ok(())
Ok(nbytes_trans)
}

async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool> {
Expand Down Expand Up @@ -245,29 +251,28 @@ impl RemoteClient {
key: &Key,
contents: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<bool> {
) -> Result<(bool, usize)> {
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;

let mut writer = Cursor::new(Vec::new());

let (_, _) = CasObject::serialize(
&mut writer,
&key.hash,
&contents,
&chunk_and_boundaries,
cas_object::CompressionScheme::LZ4,
)?;
let (_, nbytes_trans) =
CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, self.compression)?;
// free memory before the "slow" network transfer below
drop(contents);

debug!("Upload: POST to {url:?} for {key:?}");
writer.set_position(0);
let data = writer.into_inner();

let response = self.http_auth_client.post(url).body(data).send().await?;
let response_parsed: UploadXorbResponse = response.json().await?;
if !self.dry_run {
let response = self.http_auth_client.post(url).body(data).send().await?;
let response_parsed: UploadXorbResponse = response.json().await?;

Ok(response_parsed.was_inserted)
Ok((response_parsed.was_inserted, nbytes_trans))
} else {
Ok((true, nbytes_trans))
}
}

/// use the reconstruction response from CAS to re-create the described file for any calls
Expand Down Expand Up @@ -519,7 +524,7 @@ mod tests {
build_cas_object(3, ChunkSize::Random(512, 10248), cas_object::CompressionScheme::LZ4);

let threadpool = Arc::new(ThreadPool::new().unwrap());
let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, &None, &None);
let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, CompressionScheme::LZ4, &None, &None, false);
// Act
let result = threadpool
.external_run_async_task(async move { client.put(prefix, &c.info.cashash, data, chunk_boundaries).await })
Expand Down Expand Up @@ -624,6 +629,8 @@ mod tests {
chunk_cache: Some(Arc::new(chunk_cache)),
http_auth_client: http_client.clone(),
endpoint: "".to_string(),
compression: CompressionScheme::LZ4,
dry_run: false,
threadpool: threadpool.clone(),
range_download_single_flight: Arc::new(Group::new(threadpool.clone())),
};
Expand Down
5 changes: 5 additions & 0 deletions data/src/bin/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, OnceLock};

use anyhow::Result;
use cas_client::CacheConfig;
use cas_object::CompressionScheme;
use clap::{Args, Parser, Subcommand};
use data::configurations::*;
use data::{PointerFile, PointerFileTranslator};
Expand Down Expand Up @@ -81,6 +82,7 @@ fn default_clean_config() -> Result<TranslatorConfig> {
file_query_policy: Default::default(),
cas_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
compression: CompressionScheme::LZ4,
auth: None,
prefix: "default".into(),
cache_config: Some(CacheConfig {
Expand All @@ -91,6 +93,7 @@ fn default_clean_config() -> Result<TranslatorConfig> {
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
compression: Default::default(),
auth: None,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
Expand Down Expand Up @@ -121,6 +124,7 @@ fn default_smudge_config() -> Result<TranslatorConfig> {
file_query_policy: Default::default(),
cas_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
compression: CompressionScheme::LZ4,
auth: None,
prefix: "default".into(),
cache_config: Some(CacheConfig {
Expand All @@ -131,6 +135,7 @@ fn default_smudge_config() -> Result<TranslatorConfig> {
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
compression: Default::default(),
auth: None,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
Expand Down
31 changes: 25 additions & 6 deletions data/src/bin/xtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use anyhow::Result;
use cas_client::build_http_client;
use cas_object::CompressionScheme;
use clap::{Args, Parser, Subcommand};
use data::data_client::{clean_file, default_config};
use data::errors::DataProcessingError;
Expand Down Expand Up @@ -152,6 +153,12 @@ struct DedupArg {
/// to the file; otherwise write out to the stdout.
#[clap(short, long)]
output: Option<PathBuf>,
/// The compression scheme to use on XORB upload. Choices are
/// 0: no compression;
/// 1: LZ4 compression;
/// 2: 4 byte groups with LZ4 compression;
#[clap(short, long)]
compression: u8,
}

#[derive(Args)]
Expand All @@ -164,8 +171,15 @@ impl Command {
async fn run(self, hub_client: HubClient, threadpool: Arc<ThreadPool>) -> Result<()> {
match self {
Command::Dedup(arg) => {
let (all_file_info, clean_ret) =
dedup_files(arg.files, arg.recursive, arg.sequential, hub_client, threadpool).await?;
let (all_file_info, clean_ret, total_bytes_trans) = dedup_files(
arg.files,
arg.recursive,
arg.sequential,
hub_client,
threadpool,
arg.compression.try_into()?,
)
.await?;
let mut writer: Box<dyn Write> = if let Some(path) = arg.output {
Box::new(BufWriter::new(File::options().create(true).write(true).truncate(true).open(path)?))
} else {
Expand All @@ -179,6 +193,8 @@ impl Command {
println!("{}: {} bytes -> {} bytes", pf.hash_string(), pf.filesize(), new_bytes);
}

eprintln!("Transmitted {total_bytes_trans} bytes in total.");

Ok(())
},
Command::Query(arg) => query_file(arg.hash, hub_client, threadpool),
Expand All @@ -201,15 +217,18 @@ async fn dedup_files(
sequential: bool,
hub_client: HubClient,
threadpool: Arc<ThreadPool>,
) -> Result<(Vec<MDBFileInfo>, Vec<(PointerFile, u64)>)> {
compression: CompressionScheme,
) -> Result<(Vec<MDBFileInfo>, Vec<(PointerFile, u64)>, u64)> {
let token_type = "write";
let (endpoint, jwt_token, jwt_token_expiry) = hub_client.get_jwt_token(token_type).await?;
let token_refresher = Arc::new(HubClientTokenRefresher {
threadpool: threadpool.clone(),
token_type: token_type.to_owned(),
client: Arc::new(hub_client),
}) as Arc<dyn TokenRefresher>;
let (config, _tempdir) = default_config(endpoint, Some((jwt_token, jwt_token_expiry)), Some(token_refresher))?;
eprintln!("Using {compression} compression");
let (config, _tempdir) =
default_config(endpoint, Some(compression), Some((jwt_token, jwt_token_expiry)), Some(token_refresher))?;

let num_workers = if sequential { 1 } else { threadpool.num_worker_threads() };
let processor = Arc::new(PointerFileTranslator::dry_run(config, threadpool, None, false).await?);
Expand Down Expand Up @@ -248,11 +267,11 @@ async fn dedup_files(
ParallelError::TaskError(e) => e,
})?;

processor.finalize_cleaning().await?;
let total_bytes_trans = processor.finalize_cleaning().await?;

let all_file_info = processor.summarize_file_info_of_session().await?;

Ok((all_file_info, clean_ret))
Ok((all_file_info, clean_ret, total_bytes_trans))
}

fn is_git_special_files(path: &str) -> bool {
Expand Down
17 changes: 13 additions & 4 deletions data/src/cas_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

pub use cas_client::Client;
use cas_client::{CacheConfig, RemoteClient};
use cas_object::CompressionScheme;
use mdb_shard::ShardFileManager;
use tracing::info;
use utils::auth::AuthConfig;
Expand All @@ -18,23 +19,31 @@ pub(crate) fn create_cas_client(
_maybe_repo_info: &Option<RepoInfo>,
shard_manager: Arc<ShardFileManager>,
threadpool: Arc<ThreadPool>,
dry_run: bool,
) -> Result<Arc<dyn Client + Send + Sync>> {
match cas_storage_config.endpoint {
Endpoint::Server(ref endpoint) => {
remote_client(endpoint, &cas_storage_config.cache_config, &cas_storage_config.auth, threadpool)
},
Endpoint::Server(ref endpoint) => remote_client(
endpoint,
cas_storage_config.compression,
&cas_storage_config.cache_config,
&cas_storage_config.auth,
threadpool,
dry_run,
),
Endpoint::FileSystem(ref path) => local_test_cas_client(&cas_storage_config.prefix, path, shard_manager),
}
}

fn remote_client(
endpoint: &str,
compression: CompressionScheme,
cache_config: &Option<CacheConfig>,
auth: &Option<AuthConfig>,
threadpool: Arc<ThreadPool>,
dry_run: bool,
) -> Result<Arc<dyn Client + Send + Sync>> {
// Raw remote client.
let remote_client = RemoteClient::new(threadpool, endpoint, auth, cache_config);
let remote_client = RemoteClient::new(threadpool, endpoint, compression, auth, cache_config, dry_run);

Ok(Arc::new(remote_client))
}
Expand Down
2 changes: 2 additions & 0 deletions data/src/configurations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::PathBuf;
use std::str::FromStr;

use cas_client::CacheConfig;
use cas_object::CompressionScheme;
use utils::auth::AuthConfig;

use crate::errors::Result;
Expand All @@ -16,6 +17,7 @@ pub enum Endpoint {
#[derive(Debug)]
pub struct StorageConfig {
pub endpoint: Endpoint,
pub compression: CompressionScheme,
pub auth: Option<AuthConfig>,
pub prefix: String,
pub cache_config: Option<CacheConfig>,
Expand Down
15 changes: 12 additions & 3 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use cas_client::CacheConfig;
use cas_object::CompressionScheme;
use dirs::home_dir;
use lazy_static::lazy_static;
use merkledb::constants::IDEAL_CAS_BLOCK_SIZE;
Expand All @@ -30,9 +31,11 @@ const MAX_CONCURRENT_DOWNLOADS: usize = 8; // Download is not CPU-bound

const DEFAULT_CAS_ENDPOINT: &str = "http://localhost:8080";
const READ_BLOCK_SIZE: usize = 1024 * 1024;
const UPLOAD_XORB_COMPRESSION: CompressionScheme = CompressionScheme::LZ4;

pub fn default_config(
endpoint: String,
xorb_compression: Option<CompressionScheme>,
token_info: Option<(String, u64)>,
token_refresher: Option<Arc<dyn TokenRefresher>>,
) -> errors::Result<(TranslatorConfig, TempDir)> {
Expand All @@ -53,6 +56,7 @@ pub fn default_config(
file_query_policy: FileQueryPolicy::ServerOnly,
cas_storage_config: StorageConfig {
endpoint: Endpoint::Server(endpoint.clone()),
compression: xorb_compression.unwrap_or_default(),
auth: auth_cfg.clone(),
prefix: "default".into(),
cache_config: Some(CacheConfig {
Expand All @@ -63,6 +67,7 @@ pub fn default_config(
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::Server(endpoint),
compression: Default::default(),
auth: auth_cfg,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
Expand Down Expand Up @@ -98,8 +103,12 @@ pub async fn upload_async(
// produce Xorbs + Shards
// upload shards and xorbs
// for each file, return the filehash
let (config, _tempdir) =
default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), token_info, token_refresher)?;
let (config, _tempdir) = default_config(
endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()),
Some(UPLOAD_XORB_COMPRESSION),
token_info,
token_refresher,
)?;

let processor = Arc::new(PointerFileTranslator::new(config, threadpool, progress_updater, false).await?);

Expand Down Expand Up @@ -136,7 +145,7 @@ pub async fn download_async(
}
}
let (config, _tempdir) =
default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), token_info, token_refresher)?;
default_config(endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), None, token_info, token_refresher)?;

let updaters = match progress_updaters {
None => vec![None; pointer_files.len()],
Expand Down
Loading

0 comments on commit 22f0a77

Please sign in to comment.