Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify shard manager #157

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cas_client/src/local_shard_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_trait::async_trait;
use itertools::Itertools;
Expand All @@ -18,7 +19,7 @@ use crate::{RegistrationClient, ShardClientInterface};
/// Is intended to use for testing interactions between local repos that would normally
/// require the use of the remote shard server.
pub struct LocalShardClient {
shard_manager: ShardFileManager,
shard_manager: Arc<ShardFileManager>,
shard_directory: PathBuf,
global_dedup: DiskBasedGlobalDedupTable,
}
Expand Down
96 changes: 14 additions & 82 deletions data/src/bin/example.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::env::current_dir;
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};

use anyhow::Result;
use cas_client::CacheConfig;
use clap::{Args, Parser, Subcommand};
use data::configurations::*;
use data::{PointerFile, PointerFileTranslator};
Expand Down Expand Up @@ -73,83 +70,6 @@ fn main() {
.unwrap();
}

fn default_clean_config() -> Result<TranslatorConfig> {
let path = current_dir()?.join("xet");
fs::create_dir_all(&path)?;

let translator_config = TranslatorConfig {
file_query_policy: Default::default(),
cas_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("cache"),
cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB
}),
staging_directory: None,
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("shard-cache"),
cache_size: 0, // ignored
}),
staging_directory: Some(path.join("shard-session")),
},
dedup_config: Some(DedupConfig {
repo_salt: None,
global_dedup_policy: Default::default(),
}),
repo_info: Some(RepoInfo {
repo_paths: vec!["".into()],
}),
};

translator_config.validate()?;

Ok(translator_config)
}

fn default_smudge_config() -> Result<TranslatorConfig> {
let path = current_dir()?.join("xet");
fs::create_dir_all(&path)?;

let translator_config = TranslatorConfig {
file_query_policy: Default::default(),
cas_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("cache"),
cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB
}),
staging_directory: None,
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("shard-cache"),
cache_size: 0, // ignored
}),
staging_directory: Some(path.join("shard-session")),
},
dedup_config: None,
repo_info: Some(RepoInfo {
repo_paths: vec!["".into()],
}),
};

translator_config.validate()?;

Ok(translator_config)
}

async fn clean_file(arg: &CleanArg) -> Result<()> {
let reader = BufReader::new(File::open(&arg.file)?);
let writer: Box<dyn Write + Send> = match &arg.dest {
Expand All @@ -165,7 +85,13 @@ async fn clean(mut reader: impl Read, mut writer: impl Write) -> Result<()> {

let mut read_buf = vec![0u8; READ_BLOCK_SIZE];

let translator = PointerFileTranslator::new(default_clean_config()?, get_threadpool(), None, false).await?;
let translator = PointerFileTranslator::new(
TranslatorConfig::local_config(std::env::current_dir()?, true)?,
get_threadpool(),
None,
false,
)
.await?;

let handle = translator.start_clean(1024, None).await?;

Expand Down Expand Up @@ -213,7 +139,13 @@ async fn smudge(mut reader: impl Read, writer: &mut Box<dyn Write + Send>) -> Re
return Ok(());
}

let translator = PointerFileTranslator::new(default_smudge_config()?, get_threadpool(), None, true).await?;
let translator = PointerFileTranslator::new(
TranslatorConfig::local_config(std::env::current_dir()?, true)?,
get_threadpool(),
None,
true,
)
.await?;

translator.smudge_file_from_pointer(&pointer_file, writer, None, None).await?;

Expand Down
48 changes: 47 additions & 1 deletion data/src/configurations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::str::FromStr;

use cas_client::CacheConfig;
Expand Down Expand Up @@ -97,6 +97,52 @@ pub struct TranslatorConfig {
}

impl TranslatorConfig {
pub fn local_config(base_dir: impl AsRef<Path>, enable_dedup: bool) -> Result<Self> {
let path = base_dir.as_ref().join("xet");
std::fs::create_dir_all(&path)?;

let translator_config = Self {
file_query_policy: Default::default(),
cas_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("cache"),
cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB
}),
staging_directory: None,
},
shard_storage_config: StorageConfig {
endpoint: Endpoint::FileSystem(path.join("xorbs")),
auth: None,
prefix: "default-merkledb".into(),
cache_config: Some(CacheConfig {
cache_directory: path.join("shard-cache"),
cache_size: 0, // ignored
}),
staging_directory: Some(path.join("shard-session")),
},
dedup_config: {
if enable_dedup {
Some(DedupConfig {
repo_salt: None,
global_dedup_policy: Default::default(),
})
} else {
None
}
},
repo_info: Some(RepoInfo {
repo_paths: vec!["".into()],
}),
};

translator_config.validate()?;

Ok(translator_config)
}

pub fn validate(&self) -> Result<()> {
if let Endpoint::FileSystem(path) = &self.cas_storage_config.endpoint {
std::fs::create_dir_all(path)?;
Expand Down
141 changes: 139 additions & 2 deletions data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl PointerFileTranslator {
upload_progress_updater: Option<Arc<dyn ProgressUpdater>>,
download_only: bool,
) -> Result<PointerFileTranslator> {
let shard_manager = Arc::new(create_shard_manager(&config.shard_storage_config, download_only).await?);
let shard_manager = create_shard_manager(&config.shard_storage_config, download_only).await?;

let cas_client = create_cas_client(
&config.cas_storage_config,
Expand Down Expand Up @@ -194,7 +194,9 @@ impl PointerFileTranslator {
let new_cas_data = take(cas_data_accumulator.deref_mut());
drop(cas_data_accumulator); // Release the lock.

if !new_cas_data.is_empty() {
// Note: Only upload if there is new data; the file stuff here isn't really
//
if !new_cas_data.data.is_empty() {
self.xorb_uploader.register_new_cas_block(new_cas_data).await?;
}

Expand Down Expand Up @@ -253,3 +255,138 @@ impl PointerFileTranslator {
Ok(())
}
}

#[cfg(test)]
mod tests {

use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::Path;
use std::sync::{Arc, OnceLock};

use xet_threadpool::ThreadPool;

use crate::{PointerFile, PointerFileTranslator};

/// Return a shared threadpool to be reused as needed.
fn get_threadpool() -> Arc<ThreadPool> {
static THREADPOOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
THREADPOOL
.get_or_init(|| Arc::new(ThreadPool::new().expect("Error starting multithreaded runtime.")))
.clone()
}

/// Cleans (converts) a regular file into a pointer file.
///
/// * `input_path`: path to the original file
/// * `output_path`: path to write the pointer file
pub fn test_clean_file(runtime: Arc<ThreadPool>, input_path: &Path, output_path: &Path) {
let read_data = std::fs::read(input_path).unwrap().to_vec();

let mut pf_out = Box::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(output_path)
.unwrap(),
);

runtime
.external_run_async_task(async move {
let translator = PointerFileTranslator::new(
TranslatorConfig::local_config(std::env::current_dir().unwrap(), true).unwrap(),
get_threadpool(),
None,
false,
)
.await
.unwrap();

let handle = translator.start_clean(1024, None).await.unwrap();

// Read blocks from the source file and hand them to the cleaning handle
handle.add_bytes(read_data).await.unwrap();

let pointer_file_contents = handle.result().await.unwrap();
translator.finalize_cleaning().await.unwrap();

pf_out.write_all(pointer_file_contents.as_bytes()).unwrap();
})
.unwrap();
}

/// Smudges (hydrates) a pointer file back into the original data.
///
/// * `pointer_path`: path to the pointer file
/// * `output_path`: path to write the hydrated/original file
fn test_smudge_file(runtime: Arc<ThreadPool>, pointer_path: &Path, output_path: &Path) {
let mut reader = File::open(pointer_path).unwrap();
let writer: Box<dyn Write + Send + 'static> = Box::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(output_path)
.unwrap(),
);

runtime
.external_run_async_task(async move {
let mut input = String::new();
reader.read_to_string(&mut input).unwrap();

let pointer_file = PointerFile::init_from_string(&input, "");
// If not a pointer file, do nothing
if !pointer_file.is_valid() {
return;
}

let translator = PointerFileTranslator::new(
TranslatorConfig::local_config(std::env::current_dir().unwrap(), true).unwrap(),
get_threadpool(),
None,
true,
)
.await
.unwrap();

translator
.smudge_file_from_pointer(&pointer_file, &mut Box::new(writer), None, None)
.await
.unwrap();
})
.unwrap();
}

use std::fs::{read, write};

use tempfile::tempdir;

/// Unit tests
use super::*;

#[test]
fn test_clean_smudge_round_trip() {
let temp = tempdir().unwrap();
let original_data = b"Hello, world!";

let runtime = get_threadpool();

// 1. Write an original file in the temp directory
let original_path = temp.path().join("original.txt");
write(&original_path, original_data).unwrap();

// 2. Clean it (convert it to a pointer file)
let pointer_path = temp.path().join("pointer.txt");
test_clean_file(runtime.clone(), &original_path, &pointer_path);

// 3. Smudge it (hydrate the pointer file) to a new file
let hydrated_path = temp.path().join("hydrated.txt");
test_smudge_file(runtime.clone(), &pointer_path, &hydrated_path);

// 4. Verify that the round-tripped file matches the original
let result_data = read(hydrated_path).unwrap();
assert_eq!(original_data.to_vec(), result_data);
}
}
Loading