diff --git a/cas_client/src/local_shard_client.rs b/cas_client/src/local_shard_client.rs index cd081cf..b3961e3 100644 --- a/cas_client/src/local_shard_client.rs +++ b/cas_client/src/local_shard_client.rs @@ -1,5 +1,6 @@ use std::io::Cursor; use std::path::{Path, PathBuf}; +use std::sync::Arc; use async_trait::async_trait; use itertools::Itertools; @@ -18,7 +19,7 @@ use crate::{RegistrationClient, ShardClientInterface}; /// Is intended to use for testing interactions between local repos that would normally /// require the use of the remote shard server. pub struct LocalShardClient { - shard_manager: ShardFileManager, + shard_manager: Arc, shard_directory: PathBuf, global_dedup: DiskBasedGlobalDedupTable, } diff --git a/data/src/bin/example.rs b/data/src/bin/example.rs index 104e609..67d5e4c 100644 --- a/data/src/bin/example.rs +++ b/data/src/bin/example.rs @@ -1,12 +1,9 @@ -use std::env::current_dir; -use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::PathBuf; use std::sync::{Arc, OnceLock}; use anyhow::Result; -use cas_client::CacheConfig; use clap::{Args, Parser, Subcommand}; use data::configurations::*; use data::{PointerFile, PointerFileTranslator}; @@ -73,83 +70,6 @@ fn main() { .unwrap(); } -fn default_clean_config() -> Result { - let path = current_dir()?.join("xet"); - fs::create_dir_all(&path)?; - - let translator_config = TranslatorConfig { - file_query_policy: Default::default(), - cas_storage_config: StorageConfig { - endpoint: Endpoint::FileSystem(path.join("xorbs")), - auth: None, - prefix: "default".into(), - cache_config: Some(CacheConfig { - cache_directory: path.join("cache"), - cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB - }), - staging_directory: None, - }, - shard_storage_config: StorageConfig { - endpoint: Endpoint::FileSystem(path.join("xorbs")), - auth: None, - prefix: "default-merkledb".into(), - cache_config: Some(CacheConfig { - cache_directory: path.join("shard-cache"), - cache_size: 0, // ignored - }), - staging_directory: Some(path.join("shard-session")), - }, - dedup_config: Some(DedupConfig { - repo_salt: None, - global_dedup_policy: Default::default(), - }), - repo_info: Some(RepoInfo { - repo_paths: vec!["".into()], - }), - }; - - translator_config.validate()?; - - Ok(translator_config) -} - -fn default_smudge_config() -> Result { - let path = current_dir()?.join("xet"); - fs::create_dir_all(&path)?; - - let translator_config = TranslatorConfig { - file_query_policy: Default::default(), - cas_storage_config: StorageConfig { - endpoint: Endpoint::FileSystem(path.join("xorbs")), - auth: None, - prefix: "default".into(), - cache_config: Some(CacheConfig { - cache_directory: path.join("cache"), - cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB - }), - staging_directory: None, - }, - shard_storage_config: StorageConfig { - endpoint: Endpoint::FileSystem(path.join("xorbs")), - auth: None, - prefix: "default-merkledb".into(), - cache_config: Some(CacheConfig { - cache_directory: path.join("shard-cache"), - cache_size: 0, // ignored - }), - staging_directory: Some(path.join("shard-session")), - }, - dedup_config: None, - repo_info: Some(RepoInfo { - repo_paths: vec!["".into()], - }), - }; - - translator_config.validate()?; - - Ok(translator_config) -} - async fn clean_file(arg: &CleanArg) -> Result<()> { let reader = BufReader::new(File::open(&arg.file)?); let writer: Box = match &arg.dest { @@ -165,7 +85,13 @@ async fn clean(mut reader: impl Read, mut writer: impl Write) -> Result<()> { let mut read_buf = vec![0u8; READ_BLOCK_SIZE]; - let translator = PointerFileTranslator::new(default_clean_config()?, get_threadpool(), None, false).await?; + let translator = PointerFileTranslator::new( + TranslatorConfig::local_config(std::env::current_dir()?, true)?, + get_threadpool(), + None, + false, + ) + .await?; let handle = translator.start_clean(1024, None).await?; @@ -213,7 +139,13 @@ async fn smudge(mut reader: impl Read, writer: &mut Box) -> Re return Ok(()); } - let translator = PointerFileTranslator::new(default_smudge_config()?, get_threadpool(), None, true).await?; + let translator = PointerFileTranslator::new( + TranslatorConfig::local_config(std::env::current_dir()?, true)?, + get_threadpool(), + None, + true, + ) + .await?; translator.smudge_file_from_pointer(&pointer_file, writer, None, None).await?; diff --git a/data/src/configurations.rs b/data/src/configurations.rs index 89575a4..bb5646b 100644 --- a/data/src/configurations.rs +++ b/data/src/configurations.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::str::FromStr; use cas_client::CacheConfig; @@ -97,6 +97,52 @@ pub struct TranslatorConfig { } impl TranslatorConfig { + pub fn local_config(base_dir: impl AsRef, enable_dedup: bool) -> Result { + let path = base_dir.as_ref().join("xet"); + std::fs::create_dir_all(&path)?; + + let translator_config = Self { + file_query_policy: Default::default(), + cas_storage_config: StorageConfig { + endpoint: Endpoint::FileSystem(path.join("xorbs")), + auth: None, + prefix: "default".into(), + cache_config: Some(CacheConfig { + cache_directory: path.join("cache"), + cache_size: 10 * 1024 * 1024 * 1024, // 10 GiB + }), + staging_directory: None, + }, + shard_storage_config: StorageConfig { + endpoint: Endpoint::FileSystem(path.join("xorbs")), + auth: None, + prefix: "default-merkledb".into(), + cache_config: Some(CacheConfig { + cache_directory: path.join("shard-cache"), + cache_size: 0, // ignored + }), + staging_directory: Some(path.join("shard-session")), + }, + dedup_config: { + if enable_dedup { + Some(DedupConfig { + repo_salt: None, + global_dedup_policy: Default::default(), + }) + } else { + None + } + }, + repo_info: Some(RepoInfo { + repo_paths: vec!["".into()], + }), + }; + + translator_config.validate()?; + + Ok(translator_config) + } + pub fn validate(&self) -> Result<()> { if let Endpoint::FileSystem(path) = &self.cas_storage_config.endpoint { std::fs::create_dir_all(path)?; diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 474e042..0999c58 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -87,7 +87,7 @@ impl PointerFileTranslator { upload_progress_updater: Option>, download_only: bool, ) -> Result { - let shard_manager = Arc::new(create_shard_manager(&config.shard_storage_config, download_only).await?); + let shard_manager = create_shard_manager(&config.shard_storage_config, download_only).await?; let cas_client = create_cas_client( &config.cas_storage_config, @@ -194,7 +194,9 @@ impl PointerFileTranslator { let new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. - if !new_cas_data.is_empty() { + // Note: Only upload if there is new data; the file stuff here isn't really + // + if !new_cas_data.data.is_empty() { self.xorb_uploader.register_new_cas_block(new_cas_data).await?; } @@ -253,3 +255,138 @@ impl PointerFileTranslator { Ok(()) } } + +#[cfg(test)] +mod tests { + + use std::fs::{File, OpenOptions}; + use std::io::{Read, Write}; + use std::path::Path; + use std::sync::{Arc, OnceLock}; + + use xet_threadpool::ThreadPool; + + use crate::{PointerFile, PointerFileTranslator}; + + /// Return a shared threadpool to be reused as needed. + fn get_threadpool() -> Arc { + static THREADPOOL: OnceLock> = OnceLock::new(); + THREADPOOL + .get_or_init(|| Arc::new(ThreadPool::new().expect("Error starting multithreaded runtime."))) + .clone() + } + + /// Cleans (converts) a regular file into a pointer file. + /// + /// * `input_path`: path to the original file + /// * `output_path`: path to write the pointer file + pub fn test_clean_file(runtime: Arc, input_path: &Path, output_path: &Path) { + let read_data = std::fs::read(input_path).unwrap().to_vec(); + + let mut pf_out = Box::new( + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(output_path) + .unwrap(), + ); + + runtime + .external_run_async_task(async move { + let translator = PointerFileTranslator::new( + TranslatorConfig::local_config(std::env::current_dir().unwrap(), true).unwrap(), + get_threadpool(), + None, + false, + ) + .await + .unwrap(); + + let handle = translator.start_clean(1024, None).await.unwrap(); + + // Read blocks from the source file and hand them to the cleaning handle + handle.add_bytes(read_data).await.unwrap(); + + let pointer_file_contents = handle.result().await.unwrap(); + translator.finalize_cleaning().await.unwrap(); + + pf_out.write_all(pointer_file_contents.as_bytes()).unwrap(); + }) + .unwrap(); + } + + /// Smudges (hydrates) a pointer file back into the original data. + /// + /// * `pointer_path`: path to the pointer file + /// * `output_path`: path to write the hydrated/original file + fn test_smudge_file(runtime: Arc, pointer_path: &Path, output_path: &Path) { + let mut reader = File::open(pointer_path).unwrap(); + let writer: Box = Box::new( + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(output_path) + .unwrap(), + ); + + runtime + .external_run_async_task(async move { + let mut input = String::new(); + reader.read_to_string(&mut input).unwrap(); + + let pointer_file = PointerFile::init_from_string(&input, ""); + // If not a pointer file, do nothing + if !pointer_file.is_valid() { + return; + } + + let translator = PointerFileTranslator::new( + TranslatorConfig::local_config(std::env::current_dir().unwrap(), true).unwrap(), + get_threadpool(), + None, + true, + ) + .await + .unwrap(); + + translator + .smudge_file_from_pointer(&pointer_file, &mut Box::new(writer), None, None) + .await + .unwrap(); + }) + .unwrap(); + } + + use std::fs::{read, write}; + + use tempfile::tempdir; + + /// Unit tests + use super::*; + + #[test] + fn test_clean_smudge_round_trip() { + let temp = tempdir().unwrap(); + let original_data = b"Hello, world!"; + + let runtime = get_threadpool(); + + // 1. Write an original file in the temp directory + let original_path = temp.path().join("original.txt"); + write(&original_path, original_data).unwrap(); + + // 2. Clean it (convert it to a pointer file) + let pointer_path = temp.path().join("pointer.txt"); + test_clean_file(runtime.clone(), &original_path, &pointer_path); + + // 3. Smudge it (hydrate the pointer file) to a new file + let hydrated_path = temp.path().join("hydrated.txt"); + test_smudge_file(runtime.clone(), &pointer_path, &hydrated_path); + + // 4. Verify that the round-tripped file matches the original + let result_data = read(hydrated_path).unwrap(); + assert_eq!(original_data.to_vec(), result_data); + } +} diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index bf86860..fe76508 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -87,6 +87,8 @@ impl XorbUpload for ParallelXorbUploader { async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { self.status_is_ok().await?; + // Only upload a new xorb if there is new data; it may be that an existing new file is formed only + // from existing chunks. let xorb_data_len = cas_data.data.len(); let cas_hash = cas_node_hash(&cas_data.chunks[..]); @@ -110,8 +112,10 @@ impl XorbUpload for ParallelXorbUploader { upload_tasks.spawn_on( async move { let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await; - if let Some(updater) = upload_progress_updater { - updater.update(xorb_data_len as u64); + if ret.is_ok() { + if let Some(updater) = upload_progress_updater { + updater.update(xorb_data_len as u64); + } } drop(permit); ret @@ -165,7 +169,8 @@ async fn upload_and_register_xorb( (*hash, pos as u32) }) .collect(); - cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; + // XXXXXX + cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await.unwrap(); } // register for dedup @@ -185,7 +190,8 @@ async fn upload_and_register_xorb( .collect(); let cas_info = MDBCASInfo { metadata, chunks }; - shard_manager.add_cas_block(cas_info).await?; + // XXXXXXX + shard_manager.add_cas_block(cas_info).await.unwrap(); } Ok(()) diff --git a/data/src/remote_shard_interface.rs b/data/src/remote_shard_interface.rs index 2fc6909..7a1066e 100644 --- a/data/src/remote_shard_interface.rs +++ b/data/src/remote_shard_interface.rs @@ -69,7 +69,7 @@ impl RemoteShardInterface { }; let shard_manager = if file_query_policy != FileQueryPolicy::ServerOnly && shard_manager.is_none() { - Some(Arc::new(create_shard_manager(shard_storage_config, download_only).await?)) + Some(create_shard_manager(shard_storage_config, download_only).await?) } else { shard_manager }; @@ -253,7 +253,7 @@ impl RemoteShardInterface { let shard_file = cache_dir.join(local_shard_name(shard_hash)); - shard_manager.load_and_cleanup_shards_by_path(&[shard_file]).await?; + shard_manager.register_shards_by_path(&[shard_file]).await?; Ok(()) } diff --git a/data/src/shard_interface.rs b/data/src/shard_interface.rs index 5426f99..bcead6a 100644 --- a/data/src/shard_interface.rs +++ b/data/src/shard_interface.rs @@ -11,7 +11,7 @@ use super::errors::Result; pub async fn create_shard_manager( shard_storage_config: &StorageConfig, download_only_mode: bool, -) -> Result { +) -> Result> { let shard_session_directory = shard_storage_config .staging_directory .as_ref() @@ -25,7 +25,7 @@ pub async fn create_shard_manager( let shard_manager = ShardFileManager::load_dir(shard_session_directory, download_only_mode).await?; if shard_cache_directory.exists() { - shard_manager.load_and_cleanup_shards_by_path(&[shard_cache_directory]).await?; + shard_manager.register_shards_by_path(&[shard_cache_directory]).await?; } else { warn!("Merkle DB Cache path {:?} does not exist, skipping registration.", shard_cache_directory); } diff --git a/mdb_shard/src/constants.rs b/mdb_shard/src/constants.rs index 2690d84..dd9c48e 100644 --- a/mdb_shard/src/constants.rs +++ b/mdb_shard/src/constants.rs @@ -5,7 +5,7 @@ pub const MDB_SHARD_GLOBAL_DEDUP_CHUNK_MODULUS: u64 = 1024; /// The amount of time a shard should be expired by before it's deleted, in seconds. /// By default set to 7 days. -pub const MDB_SHARD_EXPIRATION_BUFFER: u64 = 7 * 24 * 3600; +pub const MDB_SHARD_EXPIRATION_BUFFER_SECS: u64 = 7 * 24 * 3600; // How the MDB_SHARD_GLOBAL_DEDUP_CHUNK_MODULUS is used. pub fn hash_is_global_dedup_eligible(h: &merklehash::MerkleHash) -> bool { diff --git a/mdb_shard/src/shard_file_manager.rs b/mdb_shard/src/shard_file_manager.rs index 02784f1..a6d68a8 100644 --- a/mdb_shard/src/shard_file_manager.rs +++ b/mdb_shard/src/shard_file_manager.rs @@ -1,16 +1,15 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use lazy_static::lazy_static; use merklehash::{HMACKey, MerkleHash}; use tokio::sync::RwLock; -use tracing::{debug, error, info, trace}; +use tracing::{debug, info, trace}; use crate::cas_structs::*; -use crate::constants::{MDB_SHARD_EXPIRATION_BUFFER, MDB_SHARD_MIN_TARGET_SIZE}; +use crate::constants::{MDB_SHARD_EXPIRATION_BUFFER_SECS, MDB_SHARD_MIN_TARGET_SIZE}; use crate::error::{MDBShardError, Result}; use crate::file_structs::*; use crate::shard_file::current_timestamp; @@ -19,23 +18,6 @@ use crate::shard_file_reconstructor::FileReconstructor; use crate::shard_in_memory::MDBInMemoryShard; use crate::utils::truncate_hash; -/// A wrapper struct for the in-memory shard to make sure that it gets flushed on teardown. -struct MDBShardFlushGuard { - shard: MDBInMemoryShard, - session_directory: Option, -} - -impl Drop for MDBShardFlushGuard { - fn drop(&mut self) { - if !self.shard.is_empty() { - // This is only supposed to happen on task cancellations, so we should - if cfg!(debug_assertions) { - eprintln!("[Debug] Warning: Shard dropped while data still present! This is an error outside of task cancellation."); - } - } - } -} - // Store a maximum of this many indices in memory const CHUNK_INDEX_TABLE_DEFAULT_MAX_SIZE: usize = 64 * 1024 * 1024; lazy_static! { @@ -93,11 +75,56 @@ impl ShardBookkeeper { } } +pub struct SFMBuildParameters { + shard_directory: PathBuf, + clean_expired_shards: bool, + target_shard_size: u64, + chunk_dedup_enabled: bool, + shard_expiration_delete_buffer_secs: u64, +} + +impl SFMBuildParameters { + pub fn new(shard_directory: impl AsRef) -> Self { + Self { + shard_directory: shard_directory.as_ref().to_path_buf(), + clean_expired_shards: true, + target_shard_size: MDB_SHARD_MIN_TARGET_SIZE, + chunk_dedup_enabled: true, + shard_expiration_delete_buffer_secs: MDB_SHARD_EXPIRATION_BUFFER_SECS, + } + } + + pub fn with_chunk_dedup(mut self, chunk_dedup_enabled: bool) -> Self { + self.chunk_dedup_enabled = chunk_dedup_enabled; + self + } + pub fn with_target_size(mut self, target_size: u64) -> Self { + self.target_shard_size = target_size; + self + } + + pub fn with_expired_shard_cleanup(mut self, cleanup: bool) -> Self { + self.clean_expired_shards = cleanup; + self + } + + pub fn with_shard_expiration_delete_buffer(mut self, nsecs: u64) -> Self { + self.shard_expiration_delete_buffer_secs = nsecs; + self + } + + pub async fn build(self) -> Result> { + Ok(ShardFileManager::new_from_builder(self).await?.0) + } +} + pub struct ShardFileManager { - shard_bookkeeper: Arc>, - current_state: Arc>, + shard_bookkeeper: RwLock, + current_state: RwLock, + + shard_directory: PathBuf, target_shard_min_size: u64, - chunk_dedup_disabled: bool, + chunk_dedup_enabled: bool, } /// Shard file manager to manage all the shards. It is fully thread-safe and async enabled. @@ -118,99 +145,69 @@ pub struct ShardFileManager { /// /// // new_shards is the list of new shards for this session. impl ShardFileManager { - /// Creates a new shard file manager at the - pub async fn new( - session_directory: &Path, - load_and_clean_directory: bool, - disable_chunk_dedup: bool, - ) -> Result { - let session_directory = { - if session_directory == PathBuf::default() { - None - } else { - Some(std::fs::canonicalize(session_directory).map_err(|e| { - error!("Error accessing session directory {session_directory:?}: {e:?}"); - e - })?) - } - }; - - let s = Self { - shard_bookkeeper: Arc::new(RwLock::new(ShardBookkeeper::new())), - current_state: Arc::new(RwLock::new(MDBShardFlushGuard { - shard: MDBInMemoryShard::default(), - session_directory: session_directory.clone(), - })), - target_shard_min_size: MDB_SHARD_MIN_TARGET_SIZE, - chunk_dedup_disabled: disable_chunk_dedup, - }; - - if load_and_clean_directory { - if let Some(sd) = &session_directory { - s.load_and_cleanup_shards_by_path(&[sd]).await?; - } - } - - Ok(s) + pub fn builder(shard_directory: impl AsRef) -> SFMBuildParameters { + SFMBuildParameters::new(shard_directory) } /// Construct a new shard file manager that uses session_directory as the temporary dumping - pub async fn load_dir(session_directory: &Path, disable_chunk_dedup: bool) -> Result { - Self::new(session_directory, true, disable_chunk_dedup).await + pub async fn load_dir(shard_directory: &Path, disable_chunk_dedup: bool) -> Result> { + Self::builder(shard_directory) + .with_chunk_dedup(!disable_chunk_dedup) + .build() + .await } - /// Sets the target value of a shard file size. By default, it is given by MDB_SHARD_MIN_TARGET_SIZE - pub fn set_target_shard_min_size(&mut self, s: u64) { - self.target_shard_min_size = s; - } + pub async fn new_from_builder(sbp: SFMBuildParameters) -> Result<(Arc, Vec>)> { + let shard_directory = &sbp.shard_directory; - /// Registers all the files in a directory with filenames matching the names - /// of an MerkleDB shard. - pub async fn register_shards_by_path>( - &self, - paths: &[P], - allow_expired_deletion: bool, - expiration_deletion_buffer: Duration, - ) -> Result>> { let mut new_shards = Vec::new(); let current_time = current_timestamp(); - let expiration_deletion_buffer_secs = expiration_deletion_buffer.as_secs(); let mut deletion_candidates = Vec::new(); - for p in paths { - let shard_files = MDBShardFile::load_all(p.as_ref())?; + let shard_files = MDBShardFile::load_all(shard_directory)?; - // Now, go through and filter out the ones that can't be used any more, and also filter out the ones that - // can't be - new_shards.extend(shard_files.into_iter().filter_map(|s| { - let expiry_time = s.shard.metadata.shard_key_expiry; - if current_time < expiry_time { - Some(s) - } else { - if allow_expired_deletion && expiry_time + expiration_deletion_buffer_secs <= current_time { - deletion_candidates.push(s.path.clone()); - } - - None + // Now, go through and filter out the ones that can't be used any more, and also filter out the ones that + // can't be + new_shards.extend(shard_files.into_iter().filter_map(|s| { + let expiry_time = s.shard.metadata.shard_key_expiry; + if current_time < expiry_time { + Some(s) + } else { + if sbp.clean_expired_shards && expiry_time + sbp.shard_expiration_delete_buffer_secs <= current_time { + deletion_candidates.push(s.path.clone()); } - })); - } + + None + } + })); for p in deletion_candidates { // silently delete expired shard files, swallowing errors if need be. let _ = std::fs::remove_file(p); } - self.register_shards(&new_shards).await?; - Ok(new_shards) + let s = Self { + shard_bookkeeper: RwLock::new(ShardBookkeeper::new()), + current_state: RwLock::new(MDBInMemoryShard::default()), + shard_directory: sbp.shard_directory, + target_shard_min_size: sbp.target_shard_size, + chunk_dedup_enabled: sbp.chunk_dedup_enabled, + }; + + s.register_shards(&new_shards).await?; + + Ok((Arc::new(s), new_shards)) } - /// This is a wrapper function to register_shards_by_path that uses the defaults above. - pub async fn load_and_cleanup_shards_by_path>(&self, paths: &[P]) -> Result>> { - self.register_shards_by_path(paths, true, Duration::new(MDB_SHARD_EXPIRATION_BUFFER, 0)) - .await + pub async fn register_shards_by_path>(&self, new_shards: &[P]) -> Result<()> { + let new_shards: Vec> = new_shards.iter().try_fold(Vec::new(), |mut acc, p| { + acc.extend(MDBShardFile::load_all(p.as_ref())?); + Result::Ok(acc) + })?; + + self.register_shards(&new_shards).await } pub async fn register_shards(&self, new_shards: &[Arc]) -> Result<()> { @@ -244,7 +241,7 @@ impl ShardFileManager { } let update_chunk_lookup = - !self.chunk_dedup_disabled && sbkp_lg.total_indexed_chunks < *CHUNK_INDEX_TABLE_MAX_SIZE; + self.chunk_dedup_enabled && sbkp_lg.total_indexed_chunks < *CHUNK_INDEX_TABLE_MAX_SIZE; // Now add in the chunk indices. let shard_index; @@ -321,7 +318,7 @@ impl FileReconstructor for ShardFileManager { // First attempt the in-memory version of this. { let lg = self.current_state.read().await; - let file_info = lg.shard.get_file_reconstruction_info(file_hash); + let file_info = lg.get_file_reconstruction_info(file_hash); if let Some(fi) = file_info { return Ok(Some((fi, None))); } @@ -351,7 +348,7 @@ impl ShardFileManager { &self, query_hashes: &[MerkleHash], ) -> Result> { - if self.chunk_dedup_disabled { + if !self.chunk_dedup_enabled { return Err(MDBShardError::Other( "Logic Error: shard_manager not initialized for dedup but dedup attempted.".to_owned(), )); @@ -360,7 +357,7 @@ impl ShardFileManager { // First attempt the in-memory version of this. { let lg = self.current_state.read().await; - let ret = lg.shard.chunk_hash_dedup_query(query_hashes); + let ret = lg.chunk_hash_dedup_query(query_hashes); if ret.is_some() { return Ok(ret); } @@ -395,11 +392,14 @@ impl ShardFileManager { pub async fn add_cas_block(&self, cas_block_contents: MDBCASInfo) -> Result<()> { let mut lg = self.current_state.write().await; - if lg.shard.shard_file_size() + cas_block_contents.num_bytes() >= self.target_shard_min_size { - self.flush_internal(&mut lg).await?; - } + lg.add_cas_block(cas_block_contents)?; - lg.shard.add_cas_block(cas_block_contents)?; + // See if this put it over the target minimum size, allowing us to cut a new shard + if lg.shard_file_size() >= self.target_shard_min_size { + // Drop the lock guard before doing the flush. + drop(lg); + self.flush().await?; + } Ok(()) } @@ -408,53 +408,41 @@ impl ShardFileManager { pub async fn add_file_reconstruction_info(&self, file_info: MDBFileInfo) -> Result<()> { let mut lg = self.current_state.write().await; - if lg.shard.shard_file_size() + file_info.num_bytes() >= self.target_shard_min_size { - self.flush_internal(&mut lg).await?; - } + lg.add_file_reconstruction_info(file_info)?; - lg.shard.add_file_reconstruction_info(file_info)?; + // See if this put it over the target minimum size, allowing us to cut a new shard + if lg.shard_file_size() >= self.target_shard_min_size { + // Drop the lock guard before doing the flush. + drop(lg); + self.flush().await?; + } Ok(()) } - async fn flush_internal( - &self, - mem_shard: &mut tokio::sync::RwLockWriteGuard<'_, MDBShardFlushGuard>, - ) -> Result> { - Ok(if let Some(path) = mem_shard.flush()? { - self.load_and_cleanup_shards_by_path(&[&path]).await?; - Some(path) - } else { - None - }) - } - /// Flush the current state of the in-memory lookups to a shard in the session directory, /// returning the hash of the shard and the file written, or None if no file was written. pub async fn flush(&self) -> Result> { - let mut lg = self.current_state.write().await; - self.flush_internal(&mut lg).await - } -} + let new_shard_path; -impl MDBShardFlushGuard { - // The flush logic is put here so that this can be done both manually and by drop - pub fn flush(&mut self) -> Result> { - if self.shard.is_empty() { - return Ok(None); - } + // The locked section here. + { + let mut lg = self.current_state.write().await; - if let Some(sd) = &self.session_directory { - let path = self.shard.write_to_directory(sd)?; - self.shard = MDBInMemoryShard::default(); + if lg.is_empty() { + return Ok(None); + } - info!("Shard manager flushed new shard to {path:?}."); + new_shard_path = lg.write_to_directory(&self.shard_directory)?; + *lg = MDBInMemoryShard::default(); - Ok(Some(path)) - } else { - debug!("Shard manager in ephemeral mode; skipping flush to disk."); - Ok(None) + info!("Shard manager flushed new shard to {new_shard_path:?}."); } + + // Load this one into our local shard catalog + self.register_shards(&[MDBShardFile::load_from_file(&new_shard_path)?]).await?; + + Ok(Some(new_shard_path)) } } @@ -465,7 +453,7 @@ impl ShardFileManager { let mut bytes = 0; { let lg = self.current_state.read().await; - bytes += lg.shard.materialized_bytes(); + bytes += lg.materialized_bytes(); } for ksc in self.shard_bookkeeper.read().await.shard_collections.iter() { @@ -482,7 +470,7 @@ impl ShardFileManager { let mut bytes = 0; { let lg = self.current_state.read().await; - bytes += lg.shard.stored_bytes(); + bytes += lg.stored_bytes(); } for ksc in self.shard_bookkeeper.read().await.shard_collections.iter() { @@ -512,7 +500,7 @@ mod tests { #[allow(clippy::type_complexity)] pub async fn fill_with_specific_shard( - shard: &mut ShardFileManager, + shard: &ShardFileManager, in_mem_shard: &mut MDBInMemoryShard, cas_nodes: &[(u64, &[(u64, u32)])], file_nodes: &[(u64, &[(u64, (u32, u32))])], @@ -567,11 +555,11 @@ mod tests { let mut rng = StdRng::seed_from_u64(seed); let shard_dir = shard_dir.as_ref(); - let mut sfm = ShardFileManager::load_dir(shard_dir, false).await?; + let sfm = ShardFileManager::load_dir(shard_dir, false).await?; let mut reference_shard = MDBInMemoryShard::default(); for _ in 0..n_shards { - fill_with_random_shard(&mut sfm, &mut reference_shard, rng.gen(), cas_block_sizes, file_chunk_range_sizes) + fill_with_random_shard(&sfm, &mut reference_shard, rng.gen(), cas_block_sizes, file_chunk_range_sizes) .await?; sfm.flush().await?; @@ -581,7 +569,7 @@ mod tests { } async fn fill_with_random_shard( - shard: &mut ShardFileManager, + shard: &Arc, in_mem_shard: &mut MDBInMemoryShard, seed: u64, cas_block_sizes: &[usize], @@ -696,15 +684,20 @@ mod tests { Ok(()) } + + async fn sfm_with_target_shard_size(path: impl AsRef, target_size: u64) -> Result> { + ShardFileManager::builder(path).with_target_size(target_size).build().await + } + #[tokio::test] async fn test_basic_retrieval() -> Result<()> { let tmp_dir = TempDir::new("gitxet_shard_test_1")?; let mut mdb_in_mem = MDBInMemoryShard::default(); { - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; - fill_with_specific_shard(&mut mdb, &mut mdb_in_mem, &[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])]).await?; + fill_with_specific_shard(&mdb, &mut mdb_in_mem, &[(0, &[(11, 5)])], &[(100, &[(200, (0, 5))])]).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; @@ -782,16 +775,10 @@ mod tests { for sesh in 0..3 { for i in 0..10 { { - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await.unwrap(); - fill_with_random_shard( - &mut mdb, - &mut mdb_in_mem, - 100 * sesh + i, - &[1, 5, 10, 8], - &[4, 3, 5, 9, 4, 6], - ) - .await - .unwrap(); + let mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await.unwrap(); + fill_with_random_shard(&mdb, &mut mdb_in_mem, 100 * sesh + i, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6]) + .await + .unwrap(); verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await.unwrap(); @@ -838,18 +825,16 @@ mod tests { const T: u64 = 10000; { - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; - mdb.set_target_shard_min_size(T); // Set the targe shard size really low - fill_with_random_shard(&mut mdb, &mut mdb_in_mem, 0, &[16; 16], &[16; 16]).await?; + let mdb = sfm_with_target_shard_size(tmp_dir.path(), T).await?; + fill_with_random_shard(&mdb, &mut mdb_in_mem, 0, &[16; 16], &[16; 16]).await?; mdb.flush().await?; } { - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; + let mdb = sfm_with_target_shard_size(tmp_dir.path(), 2 * T).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; - mdb.set_target_shard_min_size(2 * T); - fill_with_random_shard(&mut mdb, &mut mdb_in_mem, 1, &[25; 25], &[25; 25]).await?; + fill_with_random_shard(&mdb, &mut mdb_in_mem, 1, &[25; 25], &[25; 25]).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; @@ -891,9 +876,8 @@ mod tests { const T: u64 = 4096; for i in 0..5 { - let mut mdb = ShardFileManager::load_dir(tmp_dir.path(), false).await?; - mdb.set_target_shard_min_size(T); // Set the targe shard size really low - fill_with_random_shard(&mut mdb, &mut mdb_in_mem, i, &[5; 25], &[5; 25]).await?; + let mdb = sfm_with_target_shard_size(tmp_dir.path(), T).await?; + fill_with_random_shard(&mdb, &mut mdb_in_mem, i, &[5; 25], &[5; 25]).await?; verify_mdb_shards_match(&mdb, &mdb_in_mem, true).await?; @@ -911,10 +895,9 @@ mod tests { // Now, do a new shard that has less let mut last_num_files = None; let mut target_size = T; + loop { - // Now, make sure that this happens if this directory is opened up - let mut mdb2 = ShardFileManager::load_dir(tmp_dir.path(), false).await?; - mdb2.set_target_shard_min_size(target_size); + let mdb2 = sfm_with_target_shard_size(tmp_dir.path(), 2 * T).await?; // Make sure it's all in there this round. verify_mdb_shards_match(&mdb2, &mdb_in_mem, true).await?; @@ -954,7 +937,6 @@ mod tests { // First, load all of these with a shard file manager and check them. { let shard_file_manager = ShardFileManager::load_dir(tmp_dir_path, false).await?; - shard_file_manager.load_and_cleanup_shards_by_path(&[tmp_dir_path]).await?; verify_mdb_shards_match(&shard_file_manager, &ref_shard, true).await?; } @@ -1005,9 +987,6 @@ mod tests { // Now, verify that everything still works great. let shard_file_manager = ShardFileManager::load_dir(tmp_dir_path_keyed, false).await?; - shard_file_manager - .load_and_cleanup_shards_by_path(&[tmp_dir_path_keyed]) - .await?; verify_mdb_shards_match(&shard_file_manager, &ref_shard, include_info).await?; } @@ -1015,6 +994,14 @@ mod tests { Ok(()) } + async fn shard_list_with_timestamp_filtering(path: &Path, del_buffer: u64) -> Result>> { + let build_params = ShardFileManager::builder(path) + .with_expired_shard_cleanup(true) + .with_shard_expiration_delete_buffer(del_buffer); + + Ok(ShardFileManager::new_from_builder(build_params).await?.1) + } + #[tokio::test] async fn test_timestamp_filtering() -> Result<()> { let tmp_dir = TempDir::new("shard_test_timestamp")?; @@ -1039,11 +1026,7 @@ mod tests { .unwrap(); { - // Make sure this one gets loaded properly. - let shard_file_manager = ShardFileManager::new(tmp_dir_path_keyed, false, false).await?; - let loaded_shards = shard_file_manager - .register_shards_by_path(&[tmp_dir_path_keyed], false, Duration::new(100, 0)) - .await?; + let loaded_shards = shard_list_with_timestamp_filtering(tmp_dir_path_keyed, 100).await?; assert_eq!(loaded_shards.len(), 1); assert_eq!(loaded_shards[0].shard_hash, out.shard_hash) @@ -1053,11 +1036,7 @@ mod tests { std::thread::sleep(Duration::new(1, 250000000)); { - // Now, it shouldn't load any of them. - let shard_file_manager = ShardFileManager::new(tmp_dir_path_keyed, false, false).await?; - let loaded_shards = shard_file_manager - .register_shards_by_path(&[tmp_dir_path_keyed], false, Duration::new(100, 0)) - .await?; + let loaded_shards = shard_list_with_timestamp_filtering(tmp_dir_path_keyed, 100).await?; assert!(loaded_shards.is_empty()); @@ -1067,13 +1046,10 @@ mod tests { assert_eq!(n_files, 1); } - // Try again, but allow deletion. Make sure it gets cleaned up. + // Try again, but allow deletion with 0 second buffer window. Make sure it gets cleaned up. { - // Now, it shouldn't load any of them. - let shard_file_manager = ShardFileManager::new(tmp_dir_path_keyed, false, false).await?; - let loaded_shards = shard_file_manager - .register_shards_by_path(&[tmp_dir_path_keyed], true, Duration::new(0, 0)) - .await?; + // Now with zero deletion buffer time, so it should delete these shards + let loaded_shards = shard_list_with_timestamp_filtering(tmp_dir_path_keyed, 0).await?; assert!(loaded_shards.is_empty()); let n_files = std::fs::read_dir(tmp_dir_path_keyed)?.map(|p| p.unwrap().path()).count();